[openib-general] Port of NetPIPE-3.6.2 to OpenIB userspace verbs

Roland Dreier roland at topspin.com
Tue Mar 15 14:06:29 PST 2005


I just spent a little time creating a new "ibv" module for NetPIPE
that runs on top of the userspace verbs I've been developing on the
roland-uverbs branch.  This is pretty much a straight port of the
current Mellanox VAPI "ib" module, with the main changes coming from
the fact that OpenIB doesn't support the non-standard "unsignaled
receive" extension, and the fact that a completion event thread is no
longer created automatically.

I found several bugs in the verbs support while making this work, but
it seems quite stable now, although I haven't tried all option
combinations.  I also have not had a chance to compare Mellanox VAPI
and OpenIB verbs performance on identical hardware -- it would be very
useful to see this comparison on a variety of systems.

The new ibv module is contained in the patch included below.

Thanks,
  Roland


--- NetPIPE_3.6.2.orig/makefile	2004-06-09 12:46:35.000000000 -0700
+++ NetPIPE_3.6.2/makefile	2005-03-15 13:58:08.000000000 -0800
@@ -229,6 +229,10 @@
         -DINFINIBAND -DTCP -I $(VAPI_INC) -L $(VAPI_LIB) \
         -lmpga -lvapi -lpthread
 
+ibv: $(SRC)/ibv.c $(SRC)/netpipe.c $(SRC)/netpipe.h 
+	$(CC) $(CFLAGS) $(SRC)/ibv.c $(SRC)/netpipe.c -o NPibv \
+        -DOPENIB -DTCP -libverbs
+
 atoll: $(SRC)/atoll.c $(SRC)/netpipe.c $(SRC)/netpipe.h
 	$(CC) $(CFLAGS) -DATOLL $(SRC)/netpipe.c \
         $(SRC)/atoll.c -o NPatoll \
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ NetPIPE_3.6.2/src/ibv.c	2005-03-15 13:30:03.000000000 -0800
@@ -0,0 +1,1072 @@
+/*****************************************************************************/
+/* "NetPIPE" -- Network Protocol Independent Performance Evaluator.          */
+/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc.      */
+/*                                                                           */
+/* This program is free software; you can redistribute it and/or modify      */
+/* it under the terms of the GNU General Public License as published by      */
+/* the Free Software Foundation.  You should have received a copy of the     */
+/* GNU General Public License along with this program; if not, write to the  */
+/* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */
+/*                                                                           */
+/*       ibv.c             ---- Infiniband module for OpenIB verbs           */
+/*****************************************************************************/
+
+#define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */
+#include    "netpipe.h"
+#include    <stdio.h>
+#include    <getopt.h>
+#include    <pthread.h>
+
+/* Debugging output macro */
+
+FILE* logfile;
+
+#if 0
+#define LOGPRINTF(_format, _aa...) fprintf(logfile, "%s: " _format, __func__ , ##_aa); fflush(logfile)
+#else
+#define LOGPRINTF(_format, _aa...)
+#endif
+
+/* Header files needed for Infiniband */
+
+#include    <infiniband/verbs.h>
+
+/* Global vars */
+
+static struct ibv_device      *hca;
+static struct ibv_context     *ctx;
+static struct ibv_port_attr    hca_port;
+static int                     port_num;
+static uint16_t                lid;
+static uint16_t                d_lid;
+static struct ibv_pd          *pd_hndl;
+static int                     num_cqe;
+static int                     act_num_cqe;
+static struct ibv_cq          *s_cq_hndl;
+static struct ibv_cq          *r_cq_hndl;
+static struct ibv_mr          *s_mr_hndl;
+static struct ibv_mr          *r_mr_hndl;
+static struct ibv_qp_init_attr qp_init_attr;
+static struct ibv_qp          *qp_hndl;
+static uint32_t                d_qp_num;
+static struct ibv_qp_attr      qp_attr;
+static struct ibv_wc           wc;
+static int                     max_wq=50000;
+static void*                   remote_address;
+static uint32_t                remote_key;
+static volatile int            receive_complete;
+static pthread_t               thread;
+
+/* Function definitions */
+
+void Init(ArgStruct *p, int* pargc, char*** pargv)
+{
+   /* Set defaults
+    */
+   p->prot.ib_mtu = IBV_MTU_1024;        /* 1024 Byte MTU                    */
+   p->prot.commtype = NP_COMM_RDMAWRITE; /* Use RDMA write communications    */
+   p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */
+   p->tr = 0;                            /* I am not the transmitter         */
+   p->rcv = 1;                           /* I am the receiver                */      
+}
+
+void Setup(ArgStruct *p)
+{
+
+ int one = 1;
+ int sockfd;
+ struct sockaddr_in *lsin1, *lsin2;      /* ptr to sockaddr_in in ArgStruct */
+ char *host;
+ struct hostent *addr;
+ struct protoent *proto;
+ int send_size, recv_size, sizeofint = sizeof(int);
+ struct sigaction sigact1;
+ char logfilename[80];
+
+ /* Sanity check */
+ if( p->prot.commtype == NP_COMM_RDMAWRITE && 
+     p->prot.comptype != NP_COMP_LOCALPOLL ) {
+   fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n");
+   fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n");
+   fprintf(stderr, "or event completion\n");
+   exit(-1);
+ }
+ 
+ if( p->prot.commtype != NP_COMM_RDMAWRITE && 
+     p->prot.comptype == NP_COMP_LOCALPOLL ) {
+   fprintf(stderr, "Error, local polling may only be used with RDMA Write.\n");
+   fprintf(stderr, "Try using vapi polling or event completion\n");
+   exit(-1);
+ }
+
+ /* Open log file */
+ sprintf(logfilename, ".iblog%d", 1 - p->tr);
+ logfile = fopen(logfilename, "w");
+
+ host = p->host;                           /* copy ptr to hostname */ 
+
+ lsin1 = &(p->prot.sin1);
+ lsin2 = &(p->prot.sin2);
+
+ bzero((char *) lsin1, sizeof(*lsin1));
+ bzero((char *) lsin2, sizeof(*lsin2));
+
+ if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+   printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
+   exit(-4);
+ }
+
+ if(!(proto = getprotobyname("tcp"))){
+   printf("NetPIPE: protocol 'tcp' unknown!\n");
+   exit(555);
+ }
+
+ if (p->tr){                                  /* if client i.e., Sender */
+
+
+   if (atoi(host) > 0) {                   /* Numerical IP address */
+     lsin1->sin_family = AF_INET;
+     lsin1->sin_addr.s_addr = inet_addr(host);
+
+   } else {
+      
+     if ((addr = gethostbyname(host)) == NULL){
+       printf("NetPIPE: invalid hostname '%s'\n", host);
+       exit(-5);
+     }
+
+     lsin1->sin_family = addr->h_addrtype;
+     bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
+   }
+
+   lsin1->sin_port = htons(p->port);
+
+ } else {                                 /* we are the receiver (server) */
+   
+   bzero((char *) lsin1, sizeof(*lsin1));
+   lsin1->sin_family      = AF_INET;
+   lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
+   lsin1->sin_port        = htons(p->port);
+   
+   if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
+     printf("NetPIPE: server: bind on local address failed! errno=%d", errno);
+     exit(-6);
+   }
+
+ }
+
+ if(p->tr)
+   p->commfd = sockfd;
+ else
+   p->servicefd = sockfd;
+
+ 
+
+ /* Establish tcp connections */
+
+ establish(p);
+
+ /* Initialize Mellanox Infiniband */
+
+ if(initIB(p) == -1) {
+   CleanUp(p);
+   exit(-1);
+ }
+}   
+
+void event_handler(struct ibv_cq *cq);
+
+void *EventThread(void *unused)
+{
+  struct ibv_cq *cq;
+  void *data;
+
+  while (1) {
+    if (ibv_get_cq_event(ctx, 0, &cq, &data)) {
+      fprintf(stderr, "Failed to get CQ event\n");
+      return NULL;
+    }
+    event_handler(cq);
+  }
+}
+
+int initIB(ArgStruct *p)
+{
+  struct dlist *dev_list;
+  int ret;
+
+  dev_list = ibv_get_devices();
+  dlist_start(dev_list);
+  hca = dlist_next(dev_list);
+  if (!hca) {
+    fprintf(stderr, "Couldn't find any InfiniBand devices\n");
+    return -1;
+  } else {
+    LOGPRINTF("Found Infiniband HCA %s\n", ibv_get_device_name(hca));
+  }
+
+  ctx = ibv_open_device(hca);
+  if (!ctx) {
+    fprintf(stderr, "Couldn't create InfiniBand context\n");
+    return -1;
+  } else {
+    LOGPRINTF("Found Infiniband HCA %s\n", ibv_get_device_name(hca));
+  }
+
+  /* Get HCA properties */
+
+  port_num=1;
+  ret = ibv_query_port(ctx, port_num, &hca_port);
+  if(ret) {
+    fprintf(stderr, "Error querying Infiniband HCA\n");
+    return -1;
+  } else {
+    LOGPRINTF("Queried Infiniband HCA\n");
+  }
+  lid = hca_port.lid;
+  LOGPRINTF("  lid = %d\n", lid);
+
+
+  /* Allocate Protection Domain */
+
+  pd_hndl = ibv_alloc_pd(ctx);
+  if(!pd_hndl) {
+    fprintf(stderr, "Error allocating PD\n");
+    return -1;
+  } else {
+    LOGPRINTF("Allocated Protection Domain\n");
+  }
+
+
+  /* Create send completion queue */
+  
+  num_cqe = 30000; /* Requested number of completion q elements */
+  s_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL);
+  if(!s_cq_hndl) {
+    fprintf(stderr, "Error creating send CQ\n");
+    return -1;
+  } else {
+    act_num_cqe = s_cq_hndl->cqe;
+    LOGPRINTF("Created Send Completion Queue with %d elements\n", act_num_cqe);
+  }
+
+
+  /* Create recv completion queue */
+  
+  num_cqe = 20000; /* Requested number of completion q elements */
+  r_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL);
+  if(!r_cq_hndl) {
+    fprintf(stderr, "Error creating send CQ\n");
+    return -1;
+  } else {
+    act_num_cqe = r_cq_hndl->cqe;
+    LOGPRINTF("Created Recv Completion Queue with %d elements\n", act_num_cqe);
+  }
+
+
+  /* Placeholder for MR */
+
+
+  /* Create Queue Pair */
+
+  qp_init_attr.cap.max_recv_wr    = max_wq; /* Max outstanding WR on RQ      */
+  qp_init_attr.cap.max_send_wr    = max_wq; /* Max outstanding WR on SQ      */
+  qp_init_attr.cap.max_recv_sge   = 1; /* Max scatter/gather entries on RQ */
+  qp_init_attr.cap.max_send_sge   = 1; /* Max scatter/gather entries on SQ */
+  qp_init_attr.recv_cq            = r_cq_hndl; /* CQ handle for RQ         */
+  qp_init_attr.send_cq            = s_cq_hndl; /* CQ handle for SQ         */
+  qp_init_attr.sq_sig_all         = 0; /* Signalling type */
+  qp_init_attr.qp_type            = IBV_QPT_RC; /* Transmission type         */
+  
+  qp_hndl = ibv_create_qp(pd_hndl, &qp_init_attr);
+  if(!qp_hndl) {
+    fprintf(stderr, "Error creating Queue Pair\n");
+    return -1;
+  } else {
+    LOGPRINTF("Created Queue Pair\n");
+  }
+
+
+  /* Exchange lid and qp_num with other node */
+  
+  if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) {
+    fprintf(stderr, "Failed to send lid over socket\n");
+    return -1;
+  }
+  if( write(p->commfd, &qp_hndl->qp_num, sizeof(qp_hndl->qp_num) ) != sizeof(qp_hndl->qp_num) ) {
+    fprintf(stderr, "Failed to send qpnum over socket\n");
+    return -1;
+  }
+  if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) {
+    fprintf(stderr, "Failed to read lid from socket\n");
+    return -1;
+  }
+  if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) {
+    fprintf(stderr, "Failed to read qpnum from socket\n");
+    return -1;
+  }
+  
+  LOGPRINTF("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d\n",
+         lid, qp_hndl->qp_num, d_lid, d_qp_num);
+
+
+  /* Bring up Queue Pair */
+  
+  /******* INIT state ******/
+
+  qp_attr.qp_state = IBV_QPS_INIT;
+  qp_attr.pkey_index = 0;
+  qp_attr.port_num = port_num;
+  qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
+
+  ret = ibv_modify_qp(qp_hndl, &qp_attr,
+		      IBV_QP_STATE              |
+		      IBV_QP_PKEY_INDEX         |
+		      IBV_QP_PORT               |
+		      IBV_QP_ACCESS_FLAGS);
+  if(ret) {
+    fprintf(stderr, "Error modifying QP to INIT\n");
+    return -1;
+  }
+
+  LOGPRINTF("Modified QP to INIT\n");
+
+  /******* RTR (Ready-To-Receive) state *******/
+
+  qp_attr.qp_state = IBV_QPS_RTR;
+  qp_attr.max_dest_rd_atomic = 1;
+  qp_attr.dest_qp_num = d_qp_num;
+  qp_attr.ah_attr.sl = 0;
+  qp_attr.ah_attr.is_global = 0;
+  qp_attr.ah_attr.dlid = d_lid;
+  qp_attr.ah_attr.static_rate = 0;
+  qp_attr.ah_attr.src_path_bits = 0;
+  qp_attr.ah_attr.port_num = port_num;
+  qp_attr.path_mtu = p->prot.ib_mtu;
+  qp_attr.rq_psn = 0;
+  qp_attr.pkey_index = 0;
+  qp_attr.min_rnr_timer = 5;
+  
+  ret = ibv_modify_qp(qp_hndl, &qp_attr,
+		      IBV_QP_STATE              |
+		      IBV_QP_AV                 |
+		      IBV_QP_PATH_MTU           |
+		      IBV_QP_DEST_QPN           |
+		      IBV_QP_RQ_PSN             |
+		      IBV_QP_MAX_DEST_RD_ATOMIC |
+		      IBV_QP_MIN_RNR_TIMER);
+
+  if(ret) {
+    fprintf(stderr, "Error modifying QP to RTR\n");
+    return -1;
+  }
+
+  LOGPRINTF("Modified QP to RTR\n");
+
+  /* Sync before going to RTS state */
+  Sync(p);
+
+  /******* RTS (Ready-to-Send) state *******/
+
+  qp_attr.qp_state = IBV_QPS_RTS;
+  qp_attr.sq_psn = 0;
+  qp_attr.timeout = 31;
+  qp_attr.retry_cnt = 1;
+  qp_attr.rnr_retry = 1;
+  qp_attr.max_rd_atomic = 1;
+
+  ret = ibv_modify_qp(qp_hndl, &qp_attr,
+		      IBV_QP_STATE              |
+		      IBV_QP_TIMEOUT            |
+		      IBV_QP_RETRY_CNT          |
+		      IBV_QP_RNR_RETRY          |
+		      IBV_QP_SQ_PSN             |
+		      IBV_QP_MAX_QP_RD_ATOMIC);
+
+  if(ret) {
+    fprintf(stderr, "Error modifying QP to RTS\n");
+    return -1;
+  }
+  
+  LOGPRINTF("Modified QP to RTS\n");
+
+  /* If using event completion, request the initial notification */
+  if( p->prot.comptype == NP_COMP_EVENT ) {
+    if (pthread_create(&thread, NULL, EventThread, NULL)) {
+      fprintf(stderr, "Couldn't start event thread\n");
+      return -1;
+    }
+    ibv_req_notify_cq(r_cq_hndl, 0);
+  }
+ 
+  return 0;
+}
+
+int finalizeIB(ArgStruct *p)
+{
+  int ret;
+
+  LOGPRINTF("Finalizing IB stuff\n");
+
+  if(qp_hndl) {
+    LOGPRINTF("Destroying QP\n");
+    ret = ibv_destroy_qp(qp_hndl);
+    if(ret) {
+      fprintf(stderr, "Error destroying Queue Pair\n");
+    }
+  }
+
+  if(r_cq_hndl) {
+    LOGPRINTF("Destroying Recv CQ\n");
+    ret = ibv_destroy_cq(r_cq_hndl);
+    if(ret) {
+      fprintf(stderr, "Error destroying recv CQ\n");
+    }
+  }
+
+  if(s_cq_hndl) {
+    LOGPRINTF("Destroying Send CQ\n");
+    ret = ibv_destroy_cq(s_cq_hndl);
+    if(ret) {
+      fprintf(stderr, "Error destroying send CQ\n");
+    }
+  }
+
+  /* Check memory registrations just in case user bailed out */
+  if(s_mr_hndl) {
+    LOGPRINTF("Deregistering send buffer\n");
+    ret = ibv_dereg_mr(s_mr_hndl);
+    if(ret) {
+      fprintf(stderr, "Error deregistering send mr\n");
+    }
+  }
+
+  if(r_mr_hndl) {
+    LOGPRINTF("Deregistering recv buffer\n");
+    ret = ibv_dereg_mr(r_mr_hndl);
+    if(ret) {
+      fprintf(stderr, "Error deregistering recv mr\n");
+    }
+  }
+
+  if(pd_hndl) {
+    LOGPRINTF("Deallocating PD\n");
+    ret = ibv_dealloc_pd(pd_hndl);
+    if(ret) {
+      fprintf(stderr, "Error deallocating PD\n");
+    }
+  }
+
+  /* Application code should not close HCA, just release handle */
+
+  if(ctx) {
+    LOGPRINTF("Releasing HCA\n");
+    ret = ibv_close_device(ctx);
+    if(ret) {
+      fprintf(stderr, "Error releasing HCA\n");
+    }
+  }
+
+  return 0;
+}
+
+void event_handler(struct ibv_cq *cq)
+{
+  int ret;
+ 
+  while(1) {
+     
+    ret = ibv_poll_cq(cq, 1, &wc);
+
+     if(ret == 0) {
+        LOGPRINTF("Empty completion queue, requesting next notification\n");
+        ibv_req_notify_cq(r_cq_hndl, 0);
+        return;
+     } else if(ret < 0) {
+        fprintf(stderr, "Error in event_handler, polling cq\n");
+        exit(-1);
+     } else if(wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "Error in event_handler, on returned work completion "
+		"status: %d\n", wc.status);
+        exit(-1);
+     }
+     
+     LOGPRINTF("Retrieved work completion\n");
+
+     /* For ping-pong mode at least, this check shouldn't be needed for
+      * normal operation, but it will help catch any bugs with multiple
+      * sends coming through when we're only expecting one.
+      */
+     if(receive_complete == 1) {
+
+        while(receive_complete != 0) sched_yield();
+
+     }
+
+     receive_complete = 1;
+
+  }
+  
+}
+
+static int
+readFully(int fd, void *obuf, int len)
+{
+  int bytesLeft = len;
+  char *buf = (char *) obuf;
+  int bytesRead = 0;
+
+  while (bytesLeft > 0 &&
+        (bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
+    {
+      bytesLeft -= bytesRead;
+      buf += bytesRead;
+    }
+  if (bytesRead <= 0)
+    return bytesRead;
+  return len;
+}
+
+void Sync(ArgStruct *p)
+{
+    char s[] = "SyncMe";
+    char response[7];
+
+    if (write(p->commfd, s, strlen(s)) < 0 ||
+        readFully(p->commfd, response, strlen(s)) < 0)
+      {
+        perror("NetPIPE: error writing or reading synchronization string");
+        exit(3);
+      }
+    if (strncmp(s, response, strlen(s)))
+      {
+        fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n");
+        exit(3);
+      }
+}
+
+void PrepareToReceive(ArgStruct *p)
+{
+  int                ret;       /* Return code */
+  struct ibv_recv_wr rr;        /* Receive request */
+  struct ibv_recv_wr *bad_wr;
+  struct ibv_sge     sg_entry;  /* Scatter/Gather list - holds buff addr */
+
+  /* We don't need to post a receive if doing RDMA write with local polling */
+
+  if( p->prot.commtype == NP_COMM_RDMAWRITE &&
+      p->prot.comptype == NP_COMP_LOCALPOLL )
+     return;
+  
+  rr.num_sge = 1;
+  rr.sg_list = &sg_entry;
+  rr.next = NULL;
+
+  sg_entry.lkey = r_mr_hndl->lkey;
+  sg_entry.length = p->bufflen;
+  sg_entry.addr = (uintptr_t)p->r_ptr;
+
+  ret = ibv_post_recv(qp_hndl, &rr, &bad_wr);
+  if(ret) {
+    fprintf(stderr, "Error posting recv request\n");
+    CleanUp(p);
+    exit(-1);
+  } else {
+    LOGPRINTF("Posted recv request\n");
+  }
+
+  /* Set receive flag to zero and request event completion 
+   * notification for this receive so the event handler will 
+   * be triggered when the receive completes.
+   */
+  if( p->prot.comptype == NP_COMP_EVENT ) {
+    receive_complete = 0;
+  }
+}
+
+void SendData(ArgStruct *p)
+{
+  int                ret;       /* Return code */
+  struct ibv_send_wr sr;        /* Send request */
+  struct ibv_send_wr *bad_wr;
+  struct ibv_sge     sg_entry;  /* Scatter/Gather list - holds buff addr */
+
+  /* Fill in send request struct */
+
+  if(p->prot.commtype == NP_COMM_SENDRECV) {
+     sr.opcode = IBV_WR_SEND;
+     LOGPRINTF("Doing regular send\n");
+  } else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) {
+     sr.opcode = IBV_WR_SEND_WITH_IMM;
+     LOGPRINTF("Doing regular send with imm\n");
+  } else if(p->prot.commtype == NP_COMM_RDMAWRITE) {
+     sr.opcode = IBV_WR_RDMA_WRITE;
+     sr.wr.rdma.remote_addr = (uintptr_t)(remote_address + (p->s_ptr - p->s_buff));
+     sr.wr.rdma.rkey = remote_key;
+     LOGPRINTF("Doing RDMA write (raddr=%p)\n", sr.wr.rdma.remote_addr);
+  } else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) {
+     sr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+     sr.wr.rdma.remote_addr = (uintptr_t)(remote_address + (p->s_ptr - p->s_buff));
+     sr.wr.rdma.rkey = remote_key;
+     LOGPRINTF("Doing RDMA write with imm (raddr=%p)\n", sr.wr.rdma.remote_addr);
+  } else {
+     fprintf(stderr, "Error, invalid communication type in SendData\n");
+     exit(-1);
+  }
+  
+  sr.send_flags = 0; /* This needed due to a bug in Mellanox HW rel a-0 */
+
+  sr.num_sge = 1;
+  sr.sg_list = &sg_entry;
+  sr.next = NULL;
+
+  sg_entry.lkey = s_mr_hndl->lkey; /* Local memory region key */
+  sg_entry.length = p->bufflen;
+  sg_entry.addr = (uintptr_t)p->s_ptr;
+
+  ret = ibv_post_send(qp_hndl, &sr, &bad_wr);
+  if(ret) {
+    fprintf(stderr, "Error posting send request\n");
+  } else {
+    LOGPRINTF("Posted send request\n");
+  }
+
+}
+
+void RecvData(ArgStruct *p)
+{
+  int ret;
+
+  /* Busy wait for incoming data */
+
+  LOGPRINTF("Receiving at buffer address %p\n", p->r_ptr);
+
+  /*
+   * Unsignaled receives are not supported, so we must always poll the
+   * CQ, except when using RDMA writes.
+   */
+  if( p->prot.commtype == NP_COMM_RDMAWRITE ) {
+       
+    /* Poll for receive completion locally on the receive data */
+
+    LOGPRINTF("Waiting for last byte of data to arrive\n");
+     
+    while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) ) 
+    {
+       /* BUSY WAIT -- this should be fine since we 
+        * declared r_ptr with volatile qualifier */ 
+    }
+
+    /* Reset last byte */
+    p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0);
+
+    LOGPRINTF("Received all of data\n");
+
+  } else if( p->prot.comptype != NP_COMP_EVENT ) {
+     
+     /* Poll for receive completion using VAPI poll function */
+
+     LOGPRINTF("Polling completion queue for VAPI work completion\n");
+     
+     ret = 0;
+     while(ret == 0)
+        ret = ibv_poll_cq(r_cq_hndl, 1, &wc);
+
+     if(ret < 0) {
+        fprintf(stderr, "Error in RecvData, polling for completion\n");
+        exit(-1);
+     }
+
+     if(wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "Error in status of returned completion: %d\n",
+                wc.status);
+        exit(-1);
+     }
+
+     LOGPRINTF("Retrieved successful completion\n");
+     
+  } else if( p->prot.comptype == NP_COMP_EVENT ) {
+
+     /* Instead of polling directly on data or VAPI completion queue,
+      * let the VAPI event completion handler set a flag when the receive
+      * completes, and poll on that instead. Could try using semaphore here
+      * as well to eliminate busy polling
+      */
+
+     LOGPRINTF("Polling receive flag\n");
+     
+     while( receive_complete == 0 )
+     {
+        /* BUSY WAIT */
+     }
+
+     /* If in prepost-burst mode, we won't be calling PrepareToReceive
+      * between ping-pongs, so we need to reset the receive_complete
+      * flag here.
+      */
+     if( p->preburst ) receive_complete = 0;
+
+     LOGPRINTF("Receive completed\n");
+  }
+}
+
+/* Reset is used after a trial to empty the work request queues so we
+   have enough room for the next trial to run */
+void Reset(ArgStruct *p)
+{
+
+  int                ret;       /* Return code */
+  struct ibv_send_wr sr;        /* Send request */
+  struct ibv_send_wr *bad_sr;
+  struct ibv_recv_wr rr;        /* Recv request */
+  struct ibv_recv_wr *bad_rr;
+
+  /* If comptype is event, then we'll use event handler to detect receive,
+   * so initialize receive_complete flag
+   */
+  if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0;
+
+  /* Prepost receive */
+  rr.num_sge = 0;
+  rr.next = NULL;
+
+  LOGPRINTF("Posting recv request in Reset\n");
+  ret = ibv_post_recv(qp_hndl, &rr, &bad_rr);
+  if(ret) {
+    fprintf(stderr, "  Error posting recv request\n");
+    CleanUp(p);
+    exit(-1);
+  }
+
+  /* Make sure both nodes have preposted receives */
+  Sync(p);
+
+  /* Post Send */
+  sr.opcode = IBV_WR_SEND;
+  sr.send_flags = IBV_SEND_SIGNALED;
+  sr.num_sge = 0;
+  sr.next = NULL;
+
+  LOGPRINTF("Posting send request \n");
+  ret = ibv_post_send(qp_hndl, &sr, &bad_sr);
+  if(ret) {
+    fprintf(stderr, "  Error posting send request in Reset\n");
+    exit(-1);
+  }
+  if(wc.status != IBV_WC_SUCCESS) {
+     fprintf(stderr, "  Error in completion status: %d\n",
+             wc.status);
+     exit(-1);
+  }
+
+  LOGPRINTF("Polling for completion of send request\n");
+  ret = 0;
+  while(ret == 0)
+    ret = ibv_poll_cq(s_cq_hndl, 1, &wc);
+
+  if(ret < 0) {
+    fprintf(stderr, "Error polling CQ for send in Reset\n");
+    exit(-1);
+  }
+  if(wc.status != IBV_WC_SUCCESS) {
+     fprintf(stderr, "  Error in completion status: %d\n",
+             wc.status);
+     exit(-1);
+  }          
+  
+  LOGPRINTF("Status of send completion: %d\n", wc.status);
+
+  if(p->prot.comptype == NP_COMP_EVENT) { 
+     /* If using event completion, the event handler will set receive_complete
+      * when it gets the completion event.
+      */
+     LOGPRINTF("Waiting for receive_complete flag\n");
+     while(receive_complete == 0) { /* BUSY WAIT */ }
+  } else {
+     LOGPRINTF("Polling for completion of recv request\n");
+     ret = 0;
+     while(ret == 0)
+       ret = ibv_poll_cq(r_cq_hndl, 1, &wc);
+     
+     if(ret < 0) {
+       fprintf(stderr, "Error polling CQ for recv in Reset");
+       exit(-1);
+     }
+     if(wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "  Error in completion status: %d\n",
+                wc.status);
+        exit(-1);
+     }
+
+     LOGPRINTF("Status of recv completion: %d\n", wc.status);
+  }
+  LOGPRINTF("Done with reset\n");
+}
+
+void SendTime(ArgStruct *p, double *t)
+{
+    uint32_t ltime, ntime;
+
+    /*
+      Multiply the number of seconds by 1e6 to get time in microseconds
+      and convert value to an unsigned 32-bit integer.
+      */
+    ltime = (uint32_t)(*t * 1.e6);
+
+    /* Send time in network order */
+    ntime = htonl(ltime);
+    if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
+      {
+        printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
+        exit(301);
+      }
+}
+
+void RecvTime(ArgStruct *p, double *t)
+{
+    uint32_t ltime, ntime;
+    int bytesRead;
+
+    bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
+    if (bytesRead < 0)
+      {
+        printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
+        exit(302);
+      }
+    else if (bytesRead != sizeof(uint32_t))
+      {
+        fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
+                bytesRead);
+        exit(303);
+      }
+    ltime = ntohl(ntime);
+
+    /* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
+    *t = (double)ltime / 1.0e6;
+}
+
+void SendRepeat(ArgStruct *p, int rpt)
+{
+  uint32_t lrpt, nrpt;
+
+  lrpt = rpt;
+  /* Send repeat count as a long in network order */
+  nrpt = htonl(lrpt);
+  if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
+    {
+      printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
+      exit(304);
+    }
+}
+
+void RecvRepeat(ArgStruct *p, int *rpt)
+{
+  uint32_t lrpt, nrpt;
+  int bytesRead;
+
+  bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
+  if (bytesRead < 0)
+    {
+      printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
+      exit(305);
+    }
+  else if (bytesRead != sizeof(uint32_t))
+    {
+      fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
+              bytesRead);
+      exit(306);
+    }
+  lrpt = ntohl(nrpt);
+
+  *rpt = lrpt;
+}
+
+void establish(ArgStruct *p)
+{
+ int clen;
+ int one = 1;
+ struct protoent;
+
+ clen = sizeof(p->prot.sin2);
+ if(p->tr){
+   if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
+              sizeof(p->prot.sin1)) < 0){
+     printf("Client: Cannot Connect! errno=%d\n",errno);
+     exit(-10);
+   }
+  }
+  else {
+    /* SERVER */
+    listen(p->servicefd, 5);
+    p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2),
+                       &clen);
+
+    if(p->commfd < 0){
+      printf("Server: Accept Failed! errno=%d\n",errno);
+      exit(-12);
+    }
+  }
+}
+
+void CleanUp(ArgStruct *p)
+{
+   char *quit="QUIT";
+   if (p->tr)
+   {
+      write(p->commfd,quit, 5);
+      read(p->commfd, quit, 5);
+      close(p->commfd);
+   }
+   else
+   {
+      read(p->commfd,quit, 5);
+      write(p->commfd,quit,5);
+      close(p->commfd);
+      close(p->servicefd);
+   }
+
+   finalizeIB(p);
+}
+
+
+void AfterAlignmentInit(ArgStruct *p)
+{
+  int bytesRead;
+
+  /* Exchange buffer pointers and remote infiniband keys if doing rdma. Do
+   * the exchange in this function because this will happen after any
+   * memory alignment is done, which is important for getting the 
+   * correct remote address.
+  */
+  if( p->prot.commtype == NP_COMM_RDMAWRITE || 
+      p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) {
+     
+     /* Send my receive buffer address
+      */
+     if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) {
+        perror("NetPIPE: write of buffer address failed in AfterAlignmentInit");
+        exit(-1);
+     }
+     
+     LOGPRINTF("Sent buffer address: %p\n", p->r_buff);
+     
+     /* Send my remote key for accessing
+      * my remote buffer via IB RDMA
+      */
+     if(write(p->commfd, (void *)&r_mr_hndl->rkey, sizeof(uint32_t)) < 0) {
+        perror("NetPIPE: write of remote key failed in AfterAlignmentInit");
+        exit(-1);
+     }
+  
+     LOGPRINTF("Sent remote key: %d\n", r_mr_hndl->rkey);
+     
+     /* Read the sent data
+      */
+     bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*));
+     if (bytesRead < 0) {
+        perror("NetPIPE: read of buffer address failed in AfterAlignmentInit");
+        exit(-1);
+     } else if (bytesRead != sizeof(void*)) {
+        perror("NetPIPE: partial read of buffer address in AfterAlignmentInit");
+        exit(-1);
+     }
+     
+     LOGPRINTF("Received remote address from other node: %p\n", remote_address);
+     
+     bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(uint32_t));
+     if (bytesRead < 0) {
+        perror("NetPIPE: read of remote key failed in AfterAlignmentInit");
+        exit(-1);
+     } else if (bytesRead != sizeof(uint32_t)) {
+        perror("NetPIPE: partial read of remote key in AfterAlignmentInit");
+        exit(-1);
+     }
+     
+     LOGPRINTF("Received remote key from other node: %d\n", remote_key);
+
+  }
+}
+
+
+void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
+{
+  /* Allocate buffers */
+
+  p->r_buff = malloc(bufflen+MAX(soffset,roffset));
+  if(p->r_buff == NULL) {
+    fprintf(stderr, "Error malloc'ing buffer\n");
+    exit(-1);
+  }
+
+  if(p->cache) {
+
+    /* Infiniband spec says we can register same memory region
+     * more than once, so just copy buffer address. We will register
+     * the same buffer twice with Infiniband.
+     */
+    p->s_buff = p->r_buff;
+
+  } else {
+
+    p->s_buff = malloc(bufflen+soffset);
+    if(p->s_buff == NULL) {
+      fprintf(stderr, "Error malloc'ing buffer\n");
+      exit(-1);
+    }
+
+  }
+
+  /* Register buffers with Infiniband */
+
+  r_mr_hndl = ibv_reg_mr(pd_hndl, p->r_buff, bufflen + MAX(soffset, roffset),
+			 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+  if(!r_mr_hndl)
+        {
+    fprintf(stderr, "Error registering recv buffer\n");
+    exit(-1);
+        }
+        else
+        {
+         LOGPRINTF("Registered Recv Buffer\n");
+        }
+
+  s_mr_hndl = ibv_reg_mr(pd_hndl, p->s_buff, bufflen+soffset, IBV_ACCESS_LOCAL_WRITE);
+  if(!s_mr_hndl) {
+    fprintf(stderr, "Error registering send buffer\n");
+    exit(-1);
+  } else {
+    LOGPRINTF("Registered Send Buffer\n");
+  }
+
+}
+void FreeBuff(char *buff1, char *buff2)
+{
+  int ret;
+
+  if(s_mr_hndl) {
+    LOGPRINTF("Deregistering send buffer\n");
+    ret = ibv_dereg_mr(s_mr_hndl);
+    if(ret) {
+      fprintf(stderr, "Error deregistering send mr\n");
+    } else {
+      s_mr_hndl = NULL;
+    }
+  }
+
+  if(r_mr_hndl) {
+    LOGPRINTF("Deregistering recv buffer\n");
+    ret = ibv_dereg_mr(r_mr_hndl);
+    if(ret) {
+      fprintf(stderr, "Error deregistering recv mr\n");
+    } else {
+      r_mr_hndl = NULL;
+    }
+  }
+
+  if(buff1 != NULL)
+    free(buff1);
+
+  if(buff2 != NULL)
+    free(buff2);
+}
+
--- NetPIPE_3.6.2.orig/src/netpipe.c	2004-06-22 12:38:41.000000000 -0700
+++ NetPIPE_3.6.2/src/netpipe.c	2005-03-15 12:36:44.000000000 -0800
@@ -142,7 +142,7 @@
 
             case 's': streamopt = 1;
                       printf("Streaming in one direction only.\n\n");
-#if defined(TCP) && ! defined(INFINIBAND) 
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
                       printf("Sockets are reset between trials to avoid\n");
                       printf("degradation from a collapsing window size.\n\n");
 #endif
@@ -168,7 +168,7 @@
             case 'u': end = atoi(optarg);
                       break;
 
-#if defined(TCP) && ! defined(INFINIBAND)
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
             case 'b': /* -b # resets the buffer size, -b 0 keeps system defs */
                       args.prot.sndbufsz = args.prot.rcvbufsz = atoi(optarg);
                       break;
@@ -178,7 +178,7 @@
                          /* end will be maxed at sndbufsz+rcvbufsz */
                       printf("Passing data in both directions simultaneously.\n");
                       printf("Output is for the combined bandwidth.\n");
-#if defined(TCP) && ! defined(INFINIBAND)
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
                       printf("The socket buffer size limits the maximum test size.\n\n");
 #endif
                       if( streamopt ) {
@@ -270,7 +270,29 @@
                           exit(-1);
                       }
                       break;
+#endif
+
+#if defined(OPENIB)
+            case 'm': switch(atoi(optarg)) {
+                        case 256: args.prot.ib_mtu = IBV_MTU_256;
+                          break;
+                        case 512: args.prot.ib_mtu = IBV_MTU_512;
+                          break;
+                        case 1024: args.prot.ib_mtu = IBV_MTU_1024;
+                          break;
+                        case 2048: args.prot.ib_mtu = IBV_MTU_2048;
+                          break;
+                        case 4096: args.prot.ib_mtu = IBV_MTU_4096;
+                          break;
+                        default: 
+                          fprintf(stderr, "Invalid MTU size, must be one of "
+                                          "256, 512, 1024, 2048, 4096\n");
+                          exit(-1);
+                      }
+                      break;
+#endif
 
+#if defined(OPENIB) || defined(INFINIBAND)
             case 't': if( !strcmp(optarg, "send_recv") ) {
                          printf("Using Send/Receive communications\n");
                          args.prot.commtype = NP_COMM_SENDRECV;
@@ -317,7 +339,7 @@
             case 'n': nrepeat_const = atoi(optarg);
                       break;
 
-#if defined(TCP) && ! defined(INFINIBAND)
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
             case 'r': args.reset_conn = 1;
                       printf("Resetting connection after every trial\n");
                       break;
@@ -331,7 +353,7 @@
 
 #endif /* ! defined TCGMSG */
 
-#if defined(INFINIBAND)
+#if defined(OPENIB) || defined(INFINIBAND)
    asyncReceive = 1;
    fprintf(stderr, "Preposting asynchronous receives (required for Infiniband)\n");
    if(args.bidir && (
@@ -377,7 +399,7 @@
       end = args.upper;
       if( args.tr ) {
          printf("The upper limit is being set to %d Bytes\n", end);
-#if defined(TCP) && ! defined(INFINIBAND)
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
          printf("due to socket buffer size limitations\n\n");
 #endif
    }  }
@@ -990,7 +1012,7 @@
 void PrintUsage()
 {
     printf("\n NETPIPE USAGE \n\n");
-#if ! defined(INFINIBAND)
+#if ! defined(INFINIBAND) && !defined(OPENIB)
     printf("a: asynchronous receive (a.k.a. preposted receive)\n");
 #endif
     printf("B: burst all preposts before measuring performance\n");
@@ -998,7 +1020,7 @@
     printf("b: specify TCP send/receive socket buffer sizes\n");
 #endif
 
-#if defined(INFINIBAND)
+#if defined(INFINIBAND) || defined(OPENIB)
     printf("c: specify type of completion <-c type>\n"
            "   valid types: local_poll, vapi_poll, event\n"
            "   default: local_poll\n");
@@ -1010,7 +1032,7 @@
     printf("   all MPI-2 implementations\n");
 #endif
 
-#if defined(TCP) || defined(INFINIBAND)
+#if defined(TCP) || defined(INFINIBAND) || defined(OPENIB)
     printf("h: specify hostname of the receiver <-h host>\n");
 #endif
 
@@ -1019,7 +1041,7 @@
     printf("i: Do an integrity check instead of measuring performance\n");
     printf("l: lower bound start value e.g. <-l 1>\n");
 
-#if defined(INFINIBAND)
+#if defined(INFINIBAND) || defined(OPENIB)
     printf("m: set MTU for Infiniband adapter <-m mtu_size>\n");
     printf("   valid sizes: 256, 512, 1024, 2048, 4096 (default 1024)\n");
 #endif
@@ -1030,7 +1052,7 @@
     printf("p: set the perturbation number <-p 1>\n"
            "   (default = 3 Bytes, set to 0 for no perturbations)\n");
 
-#if defined(TCP) && ! defined(INFINIBAND)
+#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB)
     printf("r: reset sockets for every trial\n");
 #endif
 
@@ -1039,7 +1061,7 @@
     printf("S: Use synchronous sends.\n");
 #endif
 
-#if defined(INFINIBAND)
+#if defined(INFINIBAND) || defined(OPENIB)
     printf("t: specify type of communications <-t type>\n"
            "   valid types: send_recv, send_recv_with_imm,\n"
            "                rdma_write, rdma_write_with_imm\n"
@@ -1056,7 +1078,7 @@
 #if defined(MPI)
     printf("   May need to use -a to choose asynchronous communications for MPI/n");
 #endif
-#if defined(TCP) && !defined(INFINIBAND)
+#if defined(TCP) && !defined(INFINIBAND) && !defined(OPENIB)
     printf("   The maximum test size is limited by the TCP buffer size/n");
 #endif
     printf("\n");
@@ -1131,7 +1153,7 @@
 
     memset(p->s_buff, 'b', nbytes+soffset);
 }
-#if !defined(INFINIBAND) && !defined(ARMCI) && !defined(LAPI) && !defined(GPSHMEM) && !defined(SHMEM) && !defined(GM)
+#if !defined(OPENIB) && !defined(INFINIBAND) && !defined(ARMCI) && !defined(LAPI) && !defined(GPSHMEM) && !defined(SHMEM) && !defined(GM) 
 
 void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
 {
--- NetPIPE_3.6.2.orig/src/netpipe.h	2004-06-22 12:38:41.000000000 -0700
+++ NetPIPE_3.6.2/src/netpipe.h	2005-03-14 16:20:30.000000000 -0800
@@ -27,6 +27,10 @@
 #include <ib_defs.h> /* ib_mtu_t */
 #endif
 
+#ifdef OPENIB
+#include <infiniband/verbs.h>	/* enum ibv_mtu */
+#endif
+
 #ifdef FINAL
   #define  TRIALS             7
   #define  RUNTM              0.25
@@ -73,9 +77,14 @@
       int                     commtype; /* Communications type            */
       int                     comptype; /* Completion type                */
 #endif
+#if defined(OPENIB)
+      enum ibv_mtu            ib_mtu;   /* MTU Size for Infiniband HCA    */
+      int                     commtype; /* Communications type            */
+      int                     comptype; /* Completion type                */
+#endif
   };
 
-#if defined(INFINIBAND)
+#if defined(INFINIBAND) || defined(OPENIB)
 enum completion_types {
    NP_COMP_LOCALPOLL,  /* Poll locally on last byte of data     */
    NP_COMP_VAPIPOLL,   /* Poll using vapi function              */



More information about the general mailing list