[openib-general] client-server small message performance issues

Pete Wyckoff pw at osc.edu
Tue Oct 17 11:53:30 PDT 2006


I'm trying to understand some performance variation in an Openib
application, and wrote a small test program to simulate its
behavior.  Attached are the code and a plot of some results.  Each
dot in the plot shows the time for a single iteration in the code
explained below.

One client communicates with some number of servers.  In the plot,
anywhere from 1 to 10 servers.  There is one RC QP from the client
to each server, and no QPs between servers.  Everything is set up in
advance using TCP to exchange lid/key information.  Look for the
function "multiping" to see the client do the following:

    start timer
    foreach s in server:
	post recv to QP[s]
    foreach s in server:
	post send to QP[s], 200 bytes
    wait for 2 * numserver completions (one for each send and recv)
    stop timer

Each server meanwhile has a preposted number of receives, 20 is
plenty.  Their loop is:

    wait for receive completion
    post send to client, 200 bytes
    post recv to client QP
    wait for send completion

The results for 1000 iterations (and 20 untimed warmups), invoked as
   mpiexec -comm=none -pernode -np 11 multiping -s 200 -n 1000 ib30 > x
   egrep '^#' x
look like:

    # <numserver> <avg> +/- <stdev> median <median>, all in usec
    #  1 24.744 +/- 2.117 median 24.080 us
    #  2 30.352 +/- 2.241 median 30.041 us
    #  3 36.202 +/- 2.774 median 35.048 us
    #  4 45.475 +/- 2.347 median 45.061 us
    #  5 51.843 +/- 2.598 median 51.022 us
    #  6 58.552 +/- 2.407 median 57.936 us
    #  7 97.751 +/- 16.427 median 95.129 us
    #  8 114.346 +/- 16.568 median 113.010 us
    #  9 188.962 +/- 52.061 median 192.881 us
    # 10 230.065 +/- 48.299 median 215.054 us

Basic ping pong is 25 us.  That's fine as this is not a particularly
optimal way to communicate.  Each additional server adds 6 us.  That
seems like a lot of overhead just to do another pair of posts and
polls, but not my major complaint.  Look at the jump from 6 to 7
servers, 41 us.  Beyond that, too.  And the standard deviation
becomes huge.  A plot of the individual values shows a large spread,
not just a few outliers.

I was hoping to see each additional server add a fixed amount of
overhead to the overall time.  The same application on ethernet
starts slower, but scales much better as the number of servers is
increased.

The hardware is all Mellanox MT25204, with 18-port MT47396 switches.
I tried 11 hosts all connected to the same switch, and another 11
hosts to a different switch.  Also mixing hosts across switches.  No
perceptible changes to the results.  Also played around with QP attr
timeout and retry_count to discover that retries do happen, so the
retry_count must be at least 2, but that a timeout from 2 to 10
doesn't have an effect.  Software is stock kernel 2.6.17.6 and
libibverbs-1.0.3-1.fc4, libmthca-1.0.2-1.fc4.

Any suggestions on how to avoid these big jumps?  Explanations as to
the cause?

		-- Pete

-------------- next part --------------
/*
 * Test completion time for lots of small conversations.  Task 0 of this
 * parallel code is the "client", who does a number of iterations of a
 * test involving small transactions with "servers".  At each iteration,
 * the client pre-posts a receive on each QP, posts a small send on each,
 * then * polls until all sends and receives are completed.  Each server
 * keeps a constant number of receives posted, waits for a message to
 * arrive and immediatly responds.
 *
 * Copyright (C) Pete Wyckoff, 2006.  <pw at osc.edu>
 *
 * Built like this:
 *     gcc -O3 -c -o multiping.o multiping.c
 *     gcc -o multiping multiping.o -libverbs -lm
 *
 * Somehow get it started on many nodes, pointing them all to one
 * which is designated as the master, e.g.:
 *
 *    for i in piv002 piv004 piv006 ; do rsh -n $i multiping piv002 & done
 *
 * Mpiexec users inside a PBS job:
 *
 *    mpiexec --comm=none -pernode -nostdin multiping $(hostname)
 *
 * Bproc users could do:
 *
 *    bpsh 3-31 ./multiping n3
 *
 * Run the code with no args to see the usage() message.  Numbers
 * can be given with suffix "k", "m", or "g" to scale by 10^3, 6, or 9,
 * e.g.:  multiping -n 1k -s 1m $(hostname)
 *
 * Two environment variables adjust QP settings, e.g.:
 *   ARDMA_RETRY_COUNT=2
 *   ARDMA_TIMEOUT=4
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <math.h>
#include <infiniband/verbs.h>

/*
 * Debugging support.
 */
#if 0
#define DEBUG_LEVEL 2
#define debug(lvl,fmt,args...) \
    do { \
	if (lvl <= DEBUG_LEVEL) \
	    info(fmt,##args); \
    } while (0)
#define assert(cond,fmt,args...) \
    do { \
	if (__builtin_expect(!(cond),0)) \
	    error(fmt,##args); \
    } while (0)
#else  /* no debug version */
#  define debug(lvl,cond,fmt,...)
#  define assert(cond,fmt,...)
#endif

/*
 * Handy macros.
 */
#define ptr_from_int64(p) (void *)(unsigned long)(p)
#define int64_from_ptr(p) (u_int64_t)(unsigned long)(p)

/*
 * Some shared variables.
 */
const char *progname;
int myid, numproc;
char *myhostname;
unsigned long pagesize;
unsigned long bufsize = 4096 * 16;
int numiter = 10;

static void __attribute__((noreturn))
usage(void)
{
    fprintf(stderr, "Usage: %s [-n <numiter>] [-s <size>] <masterhost>\n",
      progname);
    exit(1);
}

#ifdef DEBUG_LEVEL
static void __attribute__((format(printf,1,2)))
info(const char *fmt, ...)
{
    char s[2048];
    va_list ap;

    va_start(ap, fmt);
    vsprintf(s, fmt, ap);
    va_end(ap);
    fprintf(stderr, "[%d/%d %s]: %s.\n", myid, numproc, myhostname, s);
}
#endif

/*
 * Warning, non-fatal.
 */
static void
warning(const char *fmt, ...)
{
    va_list ap;

    fprintf(stderr, "[%d/%d %s]: %s: Warning: ", myid, numproc, myhostname,
      progname);
    va_start(ap, fmt);
    vfprintf(stderr, fmt, ap);
    va_end(ap);
    fprintf(stderr, ".\n");
}

/*
 * Error, fatal.
 */
static void
error(const char *fmt, ...)
{
    va_list ap;

    fprintf(stderr, "[%d/%d %s]: %s: Error: ", myid, numproc, myhostname,
      progname);
    va_start(ap, fmt);
    vfprintf(stderr, fmt, ap);
    va_end(ap);
    fprintf(stderr, ".\n");
    exit(1);
}

/*
 * Error, fatal, with the errno message.
 */
static void
error_errno(const char *fmt, ...)
{
    va_list ap;

    fprintf(stderr, "[%d/%d %s]: %s: Error: ", myid, numproc, myhostname,
      progname);
    va_start(ap, fmt);
    vfprintf(stderr, fmt, ap);
    va_end(ap);
    fprintf(stderr, ": %s.\n", strerror(errno));
    exit(1);
}

/*
 * Error-checking malloc.
 */
static void *
Malloc(unsigned long n)
{
    void *x;

    if (n == 0)
	error("%s: called on zero bytes", __func__);
    x = malloc(n);
    if (!x)
	error("%s: couldn't get %lu bytes", __func__, n);
    return x;
}

/*
 * For reading from a pipe, can't always get the full buf in one chunk.
 */
static ssize_t
saferead(int fd, void *buf, size_t num)
{
    int i, offset = 0;
    int total = num;

    while (num > 0) {
	i = read(fd, (char *)buf + offset, num);
	if (i < 0)
	    error_errno("%s: %d bytes", __func__, num);
	if (i == 0) {
	    if (offset == 0) return 0;  /* end of file on a block boundary */
	    error("EOF in saferead, only %d of %d bytes", offset, total);
	}
	num -= i;
	offset += i;
    }
    return total;
}

static ssize_t
safewrite(int fd, const void *buf, size_t num)
{
    int i, offset = 0;
    int total = num;

    while (num > 0) {
	i = write(fd, (const char *)buf + offset, num);
	if (i < 0)
	    error_errno("%s: %d bytes", __func__, num);
	if (i == 0)
	    error("EOF in safewrite, only %d of %d bytes", offset, total);
	num -= i;
	offset += i;
    }
    return total;
}

static unsigned long
parse_number(const char *cp)
{
    unsigned long v;
    char *cq;

    v = strtoul(cp, &cq, 0);
    if (*cq) {
	if (!strcasecmp(cq, "k"))
	    v *= 1000;
	else if (!strcasecmp(cq, "m"))
	    v *= 1000000;
	else if (!strcasecmp(cq, "g"))
	    v *= 1000000000;
	else
	    usage();
    }
    return v;
}

/*
 * Find the next power of two above or equal to this number.  32-bit.
 * Fails on n<=0.
 */
static int
higher_power_of_2(int n)
{
    int x = n;
    x |= (x >> 1);
    x |= (x >> 2);
    x |= (x >> 4);
    x |= (x >> 8);
    x |= (x >> 16);
    x = (x >> 1) + 1;
    if (x != n)
	x <<= 1;
    return x;
}

/*
 * Set the program name, first statement of code usually.
 */
static void
set_progname(int argc __attribute__ ((unused)), char *const argv[])
{
    const char *cp;
    char s[1024];

    for (cp=progname=argv[0]; *cp; cp++)
	if (*cp == '/')
	    progname = cp+1;
    if (gethostname(s, sizeof(s)) < 0)
	error_errno("%s: gethostname", __func__);
    s[sizeof(s)-1] = '\0';
    myhostname = strdup(s);
}

/* constants used to initialize infiniband device */
static const int IB_PORT = 1;
static const unsigned int IB_NUM_CQ_ENTRIES = 30;
static const enum ibv_mtu IB_MTU = IBV_MTU_1024;  /* default mtu */

/* IB device vars */
static struct ibv_context *nic_handle;  /* NIC reference */
static struct ibv_pd *nic_pd;  /* single protection domain for all memory/QP */
static struct ibv_cq *nic_cq;  /* single completion queue for all QPs */
static uint16_t nic_lid;

/* TCP connection management */
static const unsigned short int port = 5207;
static int fd;  /* non-master only */
static int *fds;  /* master only */

/* barrier code */
static int numproc_higher_power_of_2;

typedef struct {
    int      id;  /* "myid" assigned to the other end of this qp */
    uint16_t lid;
    uint32_t qp_num;
    void *   recv_buf;
    uint32_t rkey;
} remote_info_t;

typedef struct {
    struct ibv_qp * qp;
    uint32_t        qp_num;
    void *          send_buf;
    void *          recv_buf;
    uint32_t        send_lkey;
    uint32_t        recv_lkey;
    uint32_t        recv_rkey;
    remote_info_t   remote_info;
} qp_t;
static qp_t *qps;

static struct {
    char hostname[16];
    uint16_t lid;
} *host;

static const char *masterhost = 0;

static void barrier(void);
static void ib_init(void);
static void ib_build_qp(qp_t *qp);
static void ib_bringup_qp(qp_t *qp);

static void full_connect_socket(void);
static void multiping(int np);

static void rdma_send(qp_t *to, void *buf, unsigned int len, int offset);
static void post_send(qp_t *to);
static void post_recv(qp_t *from);
static void reap_completion(void);

int
main(int argc, char **argv)
{
    int i;

    set_progname(argc, argv);
    pagesize = getpagesize();
    myid = -1;
    numproc = 0;
    if (strlen(myhostname) + 1 > sizeof(*host))
	error("%s: my hostname too big for static structure", __func__);

    while (++argv, --argc > 0) {
	const char *cp;
	if (**argv == '-') switch ((*argv)[1]) {
	    case 'n':
		cp = *argv + 2;
		for (i = 1; *cp && *cp == "numiter"[i]; ++cp, ++i)  ;
		if (*cp) usage();
		if (++argv, --argc == 0) usage();
		numiter = parse_number(*argv);
		break;
	    case 's':
		cp = *argv + 2;
		for (i = 1; *cp && *cp == "size"[i]; ++cp, ++i)  ;
		if (*cp) usage();
		if (++argv, --argc == 0) usage();
		bufsize = parse_number(*argv);
		break;
	} else {
	    if (masterhost) usage();
	    masterhost = *argv;
	}
    }

    if (!masterhost)
	usage();
    if (bufsize < 4 * sizeof(int)) {
	if (myid == 0)
	    error("%s: bufsize must be at least %d for barrier to work",
	          __func__, 4 * sizeof(int));
	else
	    return 1;
    }
    if (!strcmp(masterhost, myhostname))
	myid = 0;

    full_connect_socket();

    /* for barrier */
    numproc_higher_power_of_2 = higher_power_of_2(numproc);

    barrier();

    for (i=2; i<=numproc; i++)
	multiping(i);

    return 0;
}

/*
 * Use TCP to glue everybody together, with a
 * QP between each pair of hosts.
 */
static void
full_connect_socket(void)
{
    struct hostent *hp;
    struct sockaddr_in skin;
    socklen_t sin_len = sizeof(skin);
    struct timeval tv, now, delta;
    int i;

    hp = gethostbyname(masterhost);
    if (!hp)
	error("host \"%s\" not resolvable", masterhost);
    memset(&skin, 0, sin_len);
    skin.sin_family = hp->h_addrtype;
    memcpy(&skin.sin_addr, hp->h_addr_list[0], hp->h_length);
    skin.sin_port = htons(port);

    fd = socket(PF_INET, SOCK_STREAM, 0);
    if (fd < 0)
	error_errno("%s: socket", __func__);

    ib_init();

    if (myid == 0) {
	int maxproc;
	int flags;

	flags = 1;
	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) < 0)
	    error_errno("setsockopt reuseaddr");
	if (bind(fd, (struct sockaddr *)&skin, sin_len) < 0)
	    error_errno("bind");
	if (listen(fd, 1024) < 0)
	    error_errno("listen");

	flags = fcntl(fd, F_GETFL);
	if (flags < 0)
	    error_errno("%s: get listen socket flags", __func__);
	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
	    error_errno("%s: set listen socket nonblocking", __func__);

	maxproc = 10;
	fds = Malloc(maxproc * sizeof(*fds));
	fds[0] = -1;  /* talk to myself, no */
	numproc = 1;
	gettimeofday(&tv, 0);
	for (;;) {
	    int t = accept(fd, 0, 0);
	    if (t < 0) {
		if (errno == EAGAIN || errno == EINTR) {
		    usleep(100);
		    gettimeofday(&now, 0);
		    timersub(&now, &tv, &delta);
		    if (delta.tv_sec >= 3)  /* wait 3sec for all to connect */
			break;
		    continue;
		} else
		    error_errno("accept");
	    }
	    if (numproc == maxproc) {
		void *x = fds;
		maxproc += 10;
		fds = Malloc(maxproc * sizeof(*fds));
		if (x) {
		    memcpy(fds, x, numproc * sizeof(*fds));
		    free(x);
		}
	    }
	    fds[numproc] = t;
	    safewrite(t, &numproc, sizeof(numproc));
	    ++numproc;
	}
	close(fd);
	if (numproc == 1)
	    error("no one connected to master");

	debug(2, "%d procs", numproc);
	for (i=1; i<numproc; i++)
	    safewrite(fds[i], &numproc, sizeof(numproc));

    } else {
	gettimeofday(&tv, 0);
	for (;;) {
	    if (connect(fd, (struct sockaddr *)&skin, sin_len) == 0)
		break;
	    if (errno == ECONNREFUSED || errno == EINTR) {
		usleep(100);
		gettimeofday(&now, 0);
		timersub(&now, &tv, &delta);
		if (delta.tv_sec >= 30)
		    error("failed to connect to master");
	    } else
		error_errno("connect");
	}
	saferead(fd, &myid, sizeof(myid));
	saferead(fd, &numproc, sizeof(numproc));
    }

    host = Malloc(numproc * sizeof(*host));
    qps = Malloc(numproc * sizeof(*qps));

    /* allbroadcast hostname/lid */
    strcpy(host[myid].hostname, myhostname);
    host[myid].lid = nic_lid;
    if (myid == 0) {
	for (i=1; i<numproc; i++)
	    saferead(fds[i], &host[i], sizeof(*host));
	for (i=1; i<numproc; i++)
	    safewrite(fds[i], host, numproc * sizeof(*host));
    } else {
	safewrite(fd, &host[myid], sizeof(*host));
	saferead(fd, host, numproc * sizeof(*host));
    }

    /* build remote QPs */
    remote_info_t *infos = Malloc(numproc * sizeof(*infos));
    for (i=0; i<numproc; i++) {
	if (i == myid)
	    continue;
	ib_build_qp(&qps[i]);
	debug(4, "%s: remote info qps[%d] buf %p num %x lkey %x rkey %x",
	  __func__, i, qps[i].recv_buf,
	  qps[i].qp_num, qps[i].send_lkey, qps[i].recv_rkey);
	infos[i].id = myid;
	infos[i].lid = nic_lid;
	infos[i].qp_num = qps[i].qp_num;
	infos[i].recv_buf = qps[i].recv_buf;
	infos[i].rkey = qps[i].recv_rkey;
    }

    /* gather/scatter QP data */
    if (myid == 0) {
	remote_info_t *edarray = Malloc(numproc * numproc * sizeof(*edarray));
	for (i=0; i<numproc; i++) {
	    if (i == 0)
		memcpy(&edarray[i*numproc+0], infos,
		       numproc * sizeof(*infos));
	    else
		saferead(fds[i], &edarray[i*numproc+0],
		         numproc * sizeof(*edarray));
	}
	for (i=0; i<numproc; i++) {
	    int j;
	    for (j=0; j<numproc; j++) {
		debug(4, "info from %d to %d: buf %p num %x rkey %x",
		  i, j,
		  edarray[i*numproc+j].recv_buf,
		  edarray[i*numproc+j].qp_num,
		  edarray[i*numproc+j].rkey);
		if (j == 0)
		    memcpy(&infos[i], &edarray[i*numproc+j], sizeof(*edarray));
		else
		    safewrite(fds[j], &edarray[i*numproc+j], sizeof(*edarray));
	    }
	}
	free(edarray);
    } else {
	safewrite(fd, infos, numproc * sizeof(*infos));
	saferead(fd, infos, numproc * sizeof(*infos));
    }

    /* bring exchanged data into qps array */
    for (i=0; i<numproc; i++) {
	if (i == myid)
	    continue;
	memcpy(&qps[i].remote_info, &infos[i], sizeof(*infos));
	debug(4,
	  "adding remote to qps[%d]: lid %x buf %p num %x rkey %x id %d",
	  i, qps[i].remote_info.lid, qps[i].remote_info.recv_buf,
	  qps[i].remote_info.qp_num, qps[i].remote_info.rkey,
	  qps[i].remote_info.id);
    }
    free(infos);

    /* bringup qps */
    for (i=0; i<numproc; i++) {
	if (i == myid)
	    continue;
	ib_bringup_qp(&qps[i]);
    }
}

/*
 * Standard barrier.  Fewer levels than core-np2, but more send/recv
 * operations (which are presumably fast in IB).  Use RDMA writes to
 * a reserved region of the big buf and poll looking for expected value.
 */
static void
barrier(void)
{
    int np2 = numproc_higher_power_of_2;
    int stride = 1;
    static unsigned int generation = 0;

    debug(2, "%s: gen %d", __func__, generation);
    while (stride < np2) {
	/* every trip through this loop, each sends and receives */
	int dst = (myid + stride) % numproc;
	int src = (myid + numproc - stride) % numproc;
	volatile unsigned int *recv_buf =
	         (void *)((char *) qps[src].recv_buf + bufsize);

	debug(2, "%s: recv_buf gen %d from %d starts at %d %d", __func__,
	      generation, src, recv_buf[0], recv_buf[1]);
	rdma_send(&qps[dst], &generation, sizeof(int),
	          bufsize + (generation % 2) * sizeof(int));
	reap_completion();  /* reap the send completion */
	for (;;) {
	    /* loop until the receive finishes */
	    debug(8, "%s: recv_buf gen %d from %d now %d %d",
	          __func__, generation, src, recv_buf[0], recv_buf[1]);
	    if (recv_buf[generation % 2] == generation)
		break;
#if DEBUG_LEVEL >= 8
	    sleep(1);
#else
	    usleep(250);
#endif
	}
	stride <<= 1;
    }
    ++generation;
    debug(2, "%s: done", __func__);
}

static inline double Wtime(void)
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    return (double) tv.tv_sec + (double) tv.tv_usec / 1000000;
}

static inline int double_compare(const void *x1, const void *x2)
{
    const double *d1 = x1;
    const double *d2 = x2;
    double diff = *d1 - *d2;

    return (diff < 0. ? -1 : (diff > 0. ? 1 : 0.));
}

/*
 * Core test, np is number of servers.  Task 0 is always client.
 */
static void multiping(int np)
{
    int i, j;
    int numpost_left = 0;
    const int recv_depth = 20, warmup = 10;
    double *v = NULL, *w;
    double avg = 0., stddev = 0., median = 0.;

    if (myid == 0)
	v = Malloc(numiter * sizeof(*v));

    /* prepost */
    if (myid > 0 && myid < np) {
	numpost_left = numiter + warmup;
	for (i=0; i<recv_depth; i++) {
	    if (numpost_left == 0)
		break;
	    post_recv(&qps[0]);
	    --numpost_left;
	}
    }

    barrier();

    for (j=0; j < numiter + warmup; j++) {
	if (myid == 0) {
	    double start, end;
	    /* a "client" */

	    start = Wtime();
	    for (i=1; i<np; i++)
		post_recv(&qps[i]);
	    for (i=1; i<np; i++)
		post_send(&qps[i]);
	    for (i=1; i<np; i++) {  /* a send and recv for each */
		reap_completion();
		reap_completion();
	    }
	    end = Wtime();
	    if (j >= warmup)
		v[j-warmup] = end - start;
	}
	if (myid > 0 && myid < np) {
	    /* one of many "servers" */

	    reap_completion();   /* wait for client to send to me */
	    post_send(&qps[0]);
	    if (numpost_left > 0) {
		post_recv(&qps[0]);  /* refill prepost while waiting... */
		--numpost_left;
	    }
	    reap_completion();  /* wait for completion of my send */
	}
    }

    barrier();

    if (myid == 0) {
	if (numiter > 0) {
	    for (i=0; i<numiter; i++) {
		v[i] *= 1.0e6;  /* convert to us */
		avg += v[i];
	    }
	    avg /= (double) numiter;
	    if (numiter > 1) {
		for (i=0; i<numiter; i++) {
		    double diff = v[i] - avg;
		    stddev += diff * diff;
		}
		stddev /= (double) (numiter - 1);
		stddev = sqrt(stddev);
	    }

	    w = Malloc(numiter * sizeof(*w));
	    memcpy(w, v, numiter * sizeof(*w));
	    qsort(w, numiter, sizeof(*w), double_compare);
	    median = w[numiter/2];
	    free(w);
	}
	printf("# %2d %.3f +/- %.3f median %.3f us\n",
	       np-1, avg, stddev, median);
	for (i=0; i<numiter; i++)
	    printf("%.3f\n", v[i]);
	printf("\n");
	free(v);
    }
}

/*
 * RDMA write to this host
 */
static void
rdma_send(qp_t *to, void *buf, unsigned int len, int offset)
{
    struct ibv_sge sg;
    struct ibv_send_wr sr, *bad_wr;
    int ret;

    debug(2, "%s: to %d buf <%p, %d, %d> rkey %x", __func__,
      to->remote_info.id, buf, len, offset, to->remote_info.rkey);
    if (buf)
	memcpy(to->send_buf, buf, len);
    sg.addr = int64_from_ptr(to->send_buf);
    sg.length = buf ? len : bufsize;
    sg.lkey = to->send_lkey;

    memset(&sr, 0, sizeof(sr));
    sr.wr_id = int64_from_ptr(to);
    sr.opcode = IBV_WR_RDMA_WRITE;
    sr.send_flags = IBV_SEND_SIGNALED;
    sr.wr.rdma.remote_addr = int64_from_ptr(to->remote_info.recv_buf) + offset;
    sr.wr.rdma.rkey = to->remote_info.rkey;
    sr.sg_list = &sg;
    sr.num_sge = 1;
    sr.next = NULL;
    debug(4, "%s: lkey %x rkey %x rbuf %llx", __func__, sg.lkey,
          sr.wr.rdma.rkey, (unsigned long long) sr.wr.rdma.remote_addr);
    ret = ibv_post_send(to->qp, &sr, &bad_wr);
    if (ret < 0)
	error("%s: ibv_post_send", __func__);
}

static void post_send(qp_t *to)
{
    struct ibv_sge sg;
    struct ibv_send_wr sr, *bad_wr;
    int ret;

    debug(2, "%s: to %d", __func__, to->remote_info.id);
    sg.addr = int64_from_ptr(to->send_buf);
    sg.length = bufsize;
    sg.lkey = to->send_lkey;

    memset(&sr, 0, sizeof(sr));
    sr.wr_id = int64_from_ptr(to);
    sr.opcode = IBV_WR_SEND;
    sr.send_flags = IBV_SEND_SIGNALED;
    sr.sg_list = &sg;
    sr.num_sge = 1;
    sr.next = NULL;
    ret = ibv_post_send(to->qp, &sr, &bad_wr);
    if (ret < 0)
	error("%s: ibv_post_send", __func__);
}

static void post_recv(qp_t *from)
{
    struct ibv_sge sg;
    struct ibv_recv_wr rr, *bad_wr;
    int ret;

    debug(2, "%s: from %d", __func__, from->remote_info.id);
    sg.addr = int64_from_ptr(from->recv_buf);
    sg.length = bufsize;
    sg.lkey = from->recv_lkey;

    memset(&rr, 0, sizeof(rr));
    rr.wr_id = int64_from_ptr(from);
    rr.sg_list = &sg;
    rr.num_sge = 1;
    rr.next = NULL;
    ret = ibv_post_recv(from->qp, &rr, &bad_wr);
    if (ret < 0)
	error("%s: ibv_post_recv", __func__);
}

/*
 * Return string form of work completion status field.
 */
#define CASE(e)  case e: s = #e; break
static const char *openib_wc_status_string(int status)
{
    const char *s = "(UNKNOWN)";

    switch (status) {
	CASE(IBV_WC_SUCCESS);
	CASE(IBV_WC_LOC_LEN_ERR);
	CASE(IBV_WC_LOC_QP_OP_ERR);
	CASE(IBV_WC_LOC_EEC_OP_ERR);
	CASE(IBV_WC_LOC_PROT_ERR);
	CASE(IBV_WC_WR_FLUSH_ERR);
	CASE(IBV_WC_MW_BIND_ERR);
	CASE(IBV_WC_BAD_RESP_ERR);
	CASE(IBV_WC_LOC_ACCESS_ERR);
	CASE(IBV_WC_REM_INV_REQ_ERR);
	CASE(IBV_WC_REM_ACCESS_ERR);
	CASE(IBV_WC_REM_OP_ERR);
	CASE(IBV_WC_RETRY_EXC_ERR);
	CASE(IBV_WC_RNR_RETRY_EXC_ERR);
	CASE(IBV_WC_LOC_RDD_VIOL_ERR);
	CASE(IBV_WC_REM_INV_RD_REQ_ERR);
	CASE(IBV_WC_REM_ABORT_ERR);
	CASE(IBV_WC_INV_EECN_ERR);
	CASE(IBV_WC_INV_EEC_STATE_ERR);
	CASE(IBV_WC_FATAL_ERR);
	CASE(IBV_WC_GENERAL_ERR);
    }
    return s;
}

/*
 * Spin until one completion arrives on the CQ.
 */
static void
reap_completion(void)
{
    struct ibv_wc desc;
    qp_t *sender;

    for (;;) {
	int vret = ibv_poll_cq(nic_cq, 1, &desc);
	if (vret < 0)
	    error_errno("%s: ibv_poll_cq (%d)", __func__, vret);
	if (vret > 0)
	    break;
    }

    sender = (qp_t *) ptr_from_int64(desc.wr_id);

    if (desc.status != IBV_WC_SUCCESS)
	error("%s: entry id 0x%Lx (%s) opcode %d status %d = %s", __func__,
	  desc.wr_id,
	  numproc ? host[sender->remote_info.id].hostname : "host unknown",
	  desc.opcode,
	  desc.status, openib_wc_status_string(desc.status));

    if (desc.opcode == IBV_WC_RDMA_WRITE) {

	debug(4, "%s: rdma write to %d complete", __func__,
	  sender->remote_info.id);

    } else if (desc.opcode == IBV_WC_SEND) {

	debug(4, "%s: send to %d complete", __func__,
	  sender->remote_info.id);

    } else if (desc.opcode == IBV_WC_RECV) {

	debug(4, "%s: recv data from %d complete", __func__,
	  sender->remote_info.id);

    } else {
	error("%s: cq entry id %d opcode %d unexpected", __func__,
	  sender->remote_info.id, desc.opcode);
    }
}

static void
ib_init(void)
{
    int ret;
    struct ibv_port_attr nic_port_props;
    struct ibv_device_attr device_attr;
    int cqe_num;
    struct ibv_device **devs;
    int numdevs;

    devs = ibv_get_device_list(&numdevs);
    if (numdevs != 1)
	error("%s: expecting 1 device, not %d", __func__, numdevs);
    nic_handle = ibv_open_device(devs[0]);
    if (!nic_handle)
	error("%s: ibv_open_device", __func__);
    ibv_free_device_list(devs);

    /* connect an asynchronous event handler to look for weirdness */
    /* XXX: how? */

    /* get my lid */
    ret = ibv_query_port(nic_handle, IB_PORT, &nic_port_props);
    if (ret < 0)
	error_errno("%s: ibv_query_port", __func__);
    nic_lid = nic_port_props.lid;

    /* build a protection domain */
    nic_pd = ibv_alloc_pd(nic_handle);
    if (!nic_pd)
	error("%s: ibv_alloc_pd", __func__);
    debug(2, "%s: built pd %p", __func__, nic_pd);

    /* see how many cq entries we are allowed to have */
    ret = ibv_query_device(nic_handle, &device_attr);
    if (ret < 0)
	error_errno("%s: ibv_query_device", __func__);

    debug(4, "%s: max %d completion queue entries", __func__,
          device_attr.max_cqe);
    cqe_num = IB_NUM_CQ_ENTRIES;
    if (device_attr.max_cqe < cqe_num) {
	cqe_num = device_attr.max_cqe;
	warning("%s: hardly enough completion queue entries %d, hoping for %d",
	  __func__, device_attr.max_cqe, cqe_num);
    }

    /* build a CQ (ignore actual number returned) */
    debug(4, "%s: asking for %d completion queue entries", __func__, cqe_num);
    nic_cq = ibv_create_cq(nic_handle, cqe_num, NULL, NULL, 0);
    if (!nic_cq)
	error("%s: ibv_create_cq", __func__);
}

#define page_round(x) \
        (void *)(((unsigned long)x + pagesize - 1) & ~(pagesize - 1));

static void
ib_build_qp(qp_t *qp)
{
    void *x;
    int ret;
    struct ibv_mr *mr;
    struct ibv_qp_init_attr qp_init_attr;
    struct ibv_qp_attr attr;
    enum ibv_qp_attr_mask mask;

    /* register memory region, recv and recv buf for barriers */
    x = Malloc(bufsize + pagesize + pagesize);
    x = page_round(x);
    qp->recv_buf = x;
    mr = ibv_reg_mr(nic_pd, x, bufsize + pagesize,
                    IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
    if (!mr)
	error("%s: ibv_reg_mr recv", __func__);
    qp->recv_lkey = mr->lkey;
    qp->recv_rkey = mr->rkey;
    /* init for barriers */
    memset((char *) qp->recv_buf + bufsize, 0xff, pagesize);

    /* register memory region, send */
    x = Malloc(bufsize + pagesize);
    x = page_round(x);
    memset(x, myid & 255, bufsize);
    qp->send_buf = x;
    mr = ibv_reg_mr(nic_pd, x, bufsize, 0);
    if (!mr)
	error("%s: ibv_reg_mr send", __func__);
    qp->send_lkey = mr->lkey;

    /* build qp */
    memset(&qp_init_attr, 0, sizeof(qp_init_attr));
    /* wire both send and recv to the same CQ */
    qp_init_attr.send_cq         = nic_cq;
    qp_init_attr.recv_cq         = nic_cq;
    qp_init_attr.cap.max_send_wr = 5;  /* outstanding WQEs */
    qp_init_attr.cap.max_recv_wr = 20;
    qp_init_attr.cap.max_send_sge = 1;  /* scatter/gather entries */
    qp_init_attr.cap.max_recv_sge = 1;
    qp_init_attr.qp_type = IBV_QPT_RC;
    /* only generate completion queue entries if requested */
    qp_init_attr.sq_sig_all = 0;
    qp->qp = ibv_create_qp(nic_pd, &qp_init_attr);
    if (!qp->qp)
	error("%s: ibv_create_qp", __func__);
    qp->qp_num = qp->qp->qp_num;

    /* see HCA/vip/qpm/qp_xition.h for important settings */
    /* transition qp to init */
    mask =
       IBV_QP_STATE
     | IBV_QP_ACCESS_FLAGS
     | IBV_QP_PKEY_INDEX
     | IBV_QP_PORT;
    memset(&attr, 0, sizeof(attr));
    attr.qp_state = IBV_QPS_INIT;
    attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
	    IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
    attr.pkey_index = 0;
    attr.port_num = IB_PORT;
    ret = ibv_modify_qp(qp->qp, &attr, mask);
    if (ret < 0)
	error("%s: ibv_modify_qp RST -> INIT", __func__);
}

static unsigned long
hasenv(const char *env, unsigned long v)
{
    const char *cp = getenv(env);
    if (cp)
	v = parse_number(cp);
    return v;
}

static void
ib_bringup_qp(qp_t *qp)
{
    int ret;
    struct ibv_qp_attr attr;
    enum ibv_qp_attr_mask mask;

    /* transition qp to ready-to-receive */
    mask =
       IBV_QP_STATE
     | IBV_QP_MAX_DEST_RD_ATOMIC
     | IBV_QP_AV
     | IBV_QP_PATH_MTU
     | IBV_QP_RQ_PSN
     | IBV_QP_DEST_QPN
     | IBV_QP_MIN_RNR_TIMER;
    memset(&attr, 0, sizeof(attr));
    attr.qp_state = IBV_QPS_RTR;
    attr.max_dest_rd_atomic = 1;
    attr.ah_attr.dlid = qp->remote_info.lid;
    attr.ah_attr.port_num = IB_PORT;
    attr.path_mtu = IB_MTU;
    attr.rq_psn = 0;
    attr.dest_qp_num = qp->remote_info.qp_num;
    attr.min_rnr_timer = 31;  /* rnr never happens */
    ret = ibv_modify_qp(qp->qp, &attr, mask);
    if (ret < 0)
	error("%s: ibv_modify_qp INIT -> RTR", __func__);

    /* transition qp to ready-to-send */
    mask =
       IBV_QP_STATE
     | IBV_QP_SQ_PSN
     | IBV_QP_MAX_QP_RD_ATOMIC
     | IBV_QP_TIMEOUT
     | IBV_QP_RETRY_CNT
     | IBV_QP_RNR_RETRY;
    memset(&attr, 0, sizeof(attr));
    attr.qp_state = IBV_QPS_RTS;
    attr.sq_psn = 0;
    attr.max_rd_atomic = 1;
    attr.timeout = hasenv("ARDMA_TIMEOUT", 10);  /* 4.096us * 2^10 = 4 ms */
    attr.retry_cnt = hasenv("ARDMA_RETRY_COUNT", 7);
    attr.rnr_retry = 20;  /* RNR never happens */
    ret = ibv_modify_qp(qp->qp, &attr, mask);
    if (ret < 0)
	error("%s: ibv_modify_qp RTR -> RTS", __func__);
}

-------------- next part --------------
A non-text attachment was scrubbed...
Name: res.png
Type: image/png
Size: 10916 bytes
Desc: not available
URL: <http://lists.openfabrics.org/pipermail/general/attachments/20061017/29a426a7/attachment.png>


More information about the general mailing list