[ofa-general] FW: Updated SDP AIO test

Jim Mott jim at mellanox.com
Wed Sep 19 09:14:44 PDT 2007


In preparation for SDP updates I have reworked the ttcp.aio.c program to
include some extra options for PREADV/PWRITEV.  While kernel support for
these functions, especially for sockets is in transition from the kernel
in the current distributions to the OFED 1.3 target, there is some ugly
code at the beginning that will be removed once it is in the distributed
libaio.h.  Or maybe I should remove it now and have people only run this
with up to date libaio?

Other change is to report performance info in a comma separated format
easy to import into a spreadsheet.  An option exists to render human
readable stuff like ttcp.

Missing is the code that actually checks the data for correctness.  It
is probably something useful to add before the release (-:

Signed-off-by: Jim Mott <jim at mellanox.com>

========================================================================
====
README.txt 

sdp_aio.c
  This is a modification of ttcp.aio.c (above) that includes some new
options, mostly around vector IO, and a reformatted reporting format
that allows direct importing of results into a spreadsheet.  They
two applications are interoperable.  

  -Build instructions:
     gcc -g -o sdp_aio sdp_aio.c -laio


Usage: ./sdp_aio -t [options] host  (sending side.  Send to 'host')
       ./sdp_aio -r [optoins]       (receive side - default)

Common options:
    -v:         Generate (-t) or check (-v) data (default no)
    -d:         Set SO_DEBUG socket option (default no)
    -S:         Use SDP protcol sockets explicitly (default no)
    -p n:       Use port 'n' (default 5001)
    -l n:       Send and receive chunks of size 'n' (default 8K)
    -a n:       Number of IOs/request (default 1)
    -O n:       Buffer offset (default 0)
    -A n:       Buffer alignment (default 16K)
    -b n:       SO_SNDBUF and SO_RECVBUF socket buffer size

    -I n:       Use n element iovec[] and  PREADV / PWRITEV
    -x n:       Number of buffers to allocate (default -a value)
    -w n:       Set warning level (default 0 - no warnings)
Sending side (-t) options:
    -D:         Set TCP_NODELAY socket option
    -n n:       Number of buffers to send (use -n or -N; default 2K)
    -N n:       Number of seconds to run test for (use -n or -N)

Receive side (-r) options:
    -R:         Set SO_REUSEADDR socket option (default no)
    -L n:       Set RCVLOWAT (Receive low water mark) to 'n' (default
no)

Output:
  Human readable with -w1

  Comma seperated line:
    1 - TX/RX Role of this instance
    2 - buf_len         -l n
    3 - buf_off         -O n
    4 - buf_align       -A n
    5 - num_conc        -a n
    6 - num_sec         -N n
    7 - num_cnt         -n n
    8 - opt_iovec       -I n
    9 - opt_cnt         -x n
   10 - "Options and info"
   11 - bytes           Number of bytes transfered
   12 - calls           Number of system (io_submit, io_getevents)
   13 - buffs           Number of buffers transfered
   14 - us_wall         Wall clock time in uS
   15 - us_user         User space CPU time
   16 - us_sys          System CPU time

========================================================================
====
/*
 * sdp_aio - Test Linux libaio on SDP (and non-SDP) sockets.
 *
 * Based on ttcp.c; T.C. Slattery, USNA
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <ctype.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>

#include <libaio.h>

/*
 * Additional commands
 *   While libaio.h does not include the vector IO commands in the
 * io_iocb_cmd{} enumeration, the kernel includes support for them.
 *
 * Start of libaio.h extensions
 */
enum {
	IO_CMD_PREADV = 7,	/* IOCB_CMD_PREADV */
	IO_CMD_PWRITEV = 8	/* IOCB_CMD_PWRITEV */
};

static inline void io_prep_preadv(struct iocb *iocb, int fd, struct
iovec *iov,
				  int nr_segs, long long offset)
{
	memset(iocb, 0, sizeof(*iocb));
	iocb->aio_fildes = fd;
	iocb->aio_lio_opcode = IO_CMD_PREADV;
	iocb->u.v.vec = iov;
	iocb->u.v.nr = nr_segs;
	iocb->u.v.offset = offset;
}

static inline void io_prep_pwritev(struct iocb *iocb, int fd, struct
iovec *iov,
				   int nr_segs, long long offset)
{
	memset(iocb, 0, sizeof(*iocb));
	iocb->aio_fildes = fd;
	iocb->aio_lio_opcode = IO_CMD_PWRITEV;
	iocb->u.v.vec = iov;
	iocb->u.v.nr = nr_segs;
	iocb->u.v.offset = offset;
}

/* End of libaio.h extensions */


struct app_hdr {
	uint32_t buf_num;	/* Send size buffer number */
	uint32_t buf_len;	/* Total size of sent data */
	uint32_t opt_cnt;	/* Send side opt_cnt */
	uint32_t filler;	/* Not used - keep size nice */
	uint64_t ip;		/* Used by verify only */
};

struct buff_ptr {
	char *base;
	char *offset;
	int len;

	struct iovec *iov;
	int icnt;
};

/* Needs to be in system include file - someday */
#ifndef AF_INET_SDP
#define AF_INET_SDP 27
#endif



static char base_pattern[] = {0x11, 055, 0xCC, 0x5C, 0xC5, 0x22, 0x81};

/*
 * PARAMETERS
 *   These are the parameters that can be overridden by command line
 * line options.  The default values are established here.
 */
int mode_rx   = 1;		/* -t 0 / -r 1: Set app mode
send/receive */
int num_sec   = 0;		/* -N n: seconds to run */
int num_cnt   = 2 * 1024;	/* -n n: Number of buffers to send */
int num_conc  = 1;		/* -a n: Number of cuncurrent IOs */

int buf_len   = 8 * 1024;	/* -l n: Length of buffers to send */
int buf_off   = 0;		/* -O n: Buffer offset */
int buf_align = 16 * 1024;	/* -A n: Alignment */
short port    = 5001;		/* -p n: Port number to use */
int domain    = AF_INET;	/* -S: Use SDP protocol explicitly */

int opt_ver   = 0;		/* -v: Verify data transfered */
int opt_nodel = 0;		/* -D: Set TCP_NODELAY */
int opt_reuse = 0;		/* -R: SO_REUSEADDR */
int opt_dbg   = 0;		/* -d: SO_DEBUG */
int opt_bytes = 0;		/* -B: Format Bytes/sec else bytes/sec
*/
int opt_lingr = 0;		/* -G: SOLINGER so close waits for data
*/
int opt_sbuf  = -1;		/* -b n: SO_SNDBUF */
int opt_rbuf  = -1;		/* -b n: SO_RCVBUF */
int opt_rlow  = -1;		/* -L n: SO_RCVLOWAT */

int opt_iovec = 0;		/* -I n: n per-iovec, Use PREADV &
PWRITEV */
int opt_cnt   = 0;		/* -x n: Number of unique buffers */


/*
 * Holds output string
 */
static int warn = 0;		/* How chatty on errors */
static char *name = NULL;	/* Name of the application */
static char  label[1000];
static char *lba;


/* Global variables */
static int fd;
static struct sockaddr_in s_in;

static int cur_data = 0;	/* Pointer to next data[] buffer */
static struct buff_ptr **data;

static io_context_t io_ctx = NULL;
static struct io_event  *events;
static struct iocb **iocbs;

static struct timeval in_time,  out_time;
static struct rusage  in_usage, out_usage;
static sig_atomic_t done = 0;	/* Set by timer or error to end test */

static char usage_txt[] = "\
Usage: %s -t [options] host  (sending side.  Send to 'host')\n\
       %s -r [optoins]       (receive side - default)\n\n\
Common options:\n\
    -v:		Generate (-t) or check (-v) data (default no)\n\
    -d:		Set SO_DEBUG socket option (default no)\n\
    -S:		Use SDP protcol sockets explicitly (default no)\n\
    -p n:	Use port 'n' (default 5001)\n\
    -l n:	Send and receive chunks of size 'n' (default 8K)\n\
    -a n:	Number of IOs/request (default 1)\n\
    -O n:	Buffer offset (default 0)\n\
    -A n:	Buffer alignment (default 16K)\n\
    -b n:	SO_SNDBUF and SO_RECVBUF socket buffer size\n\n\
    -I n:	Use n element iovec[] and  PREADV / PWRITEV\n\
    -x n:	Number of buffers to allocate (default -a value)\n\
    -w n:	Set warning level (default 0 - no warnings)\n\
Sending side (-t) options:\n\
    -D:		Set TCP_NODELAY socket option\n\
    -n n:	Number of buffers to send (use -n or -N; default 2K)\n\
    -N n:	Number of seconds to run test for (use -n or -N)\n\n\
Receive side (-r) options:\n\
    -R:		Set SO_REUSEADDR socket option (default no)\n\
    -L n:	Set RCVLOWAT (Receive low water mark) to 'n' (default
no)\n\
\n\
Output:\n\
  Human readable with -w1\n\
\n\
  Command seperated line:\n\
    1 - TX/RX Role of this instance\n\
    2 - buf_len		-l n\n\
    3 - buf_off		-O n\n\
    4 - buf_align	-A n\n\
    5 - num_conc	-a n\n\
    6 - num_sec		-N n\n\
    7 - num_cnt		-n n\n\
    8 - opt_iovec	-I n\n\
    9 - opt_cnt		-x n\n\
   10 - \"Options and info\"\n\
   11 - bytes		Number of bytes transfered\n\
   12 - calls		Number of system (io_submit, io_getevents)\n\
   13 - buffs		Number of buffers transfered\n\
   14 - us_wall		Wall clock time in uS\n\
   15 - us_user		User space CPU time\n\
   16 - us_sys		System CPU time\n\
";

static void die_usage(void)
{
	fprintf(stderr, usage_txt, name, name);
	exit(-1);
}
 

static void die_error(char *msg)
{
	if (errno)
		perror(msg);
	else
		fprintf(stderr, "%s\n", msg);

	exit(-1);
}

static void do_log(int level, char *msg)
{
	if (level < warn)
		fprintf(stderr, "  --> log: %s\n", msg);
}

static void sig_pipe(int value)
{
	done = 1;
}

static void sig_time(int value)
{
	done = 1;
}

/*
 * new_b
 *   This function creates and initializes a single buffer.
 *
 * num		The 'number' of this buffer
 * size		The size of the data transfer from this buffer
 * offset	The offset into the buffer of the first real byte
 * align	The alignmnet (power of 2) for this buffer
 * icnt		The number of iovec elements used to map the buffer
 */
static struct buff_ptr *new_b(int num, int size, int offset, int align,
int icnt)
{
	struct buff_ptr *bp;
	struct app_hdr *ah;
	struct iovec *ip;
	char *cp, msg[200];
	int rc, i, j, alloc_size;
	unsigned char mask;

	alloc_size = size + offset;

	rc = posix_memalign((void *)&cp, align, alloc_size);
	if (rc)
		die_error("unable to allocate data buffer");
	memset(cp, 0, alloc_size);

	bp = (struct buff_ptr *)malloc(sizeof(struct buff_ptr));
	if (!bp)
		die_error("unable to allocate buffer descriptor");
	memset(bp, 0, sizeof(struct buff_ptr));

	if (icnt) {
		ip = (struct iovec *)calloc(icnt, sizeof(struct iovec));
		if (!ip)
			die_error("unable to allocate iovec");
	} else
		ip = NULL;

	/* buff_ptr describes a single send/receive buffer in user space
*/
	bp->base   = cp;
	bp->offset = offset + cp;
	bp->len    = size;
	bp->iov    = ip;
	bp->icnt   = icnt;

	cp = bp->offset;		/* Start of data buffer */
	ah = (struct app_hdr *)cp;

	/* 
	 *   Most of the buffer holds a pattern; but there is some
unique stuff
	 * at the beginning.
	 */
	ah->buf_num = htonl(num);
	ah->buf_len = htonl(size);
	ah->opt_cnt = htonl(opt_cnt);

	cp   += sizeof(struct app_hdr);
	size -= sizeof(struct app_hdr);

	mask = (unsigned char)(num % 255);
	for (i=j=0; i<size; i++) {
		cp[i] = mask ^ base_pattern[j++];
		if (j >= sizeof(base_pattern))
			j = 0;
	}

	/* If we are not doing iovec[] IO, then buffer is done */
	if (!ip) {
		sprintf(msg, "Buff %d at 0x%lX offset=0x%lX, len=%d",
num,
			(unsigned long)bp->base, (unsigned
long)bp->offset,
			bp->len);
		do_log(4, msg);

		return(bp);
	}

	sprintf(msg, "Buff %d at 0x%lX offset=0x%lX, len=%d, iov=0x%lX,
cnt=%d",
		num, (unsigned long)bp->base, (unsigned long)bp->offset,
bp->len,
		(unsigned long)bp->iov, bp->icnt);
	do_log(4, msg);

	/*
	 *   In an ideal world, we would build these buffers differently
if
	 * we were doing send than receive.  The send side would scatter

	 * the data around and use the iov to gather it, and the receive
	 * size would create it linearly.  Maybe next time.
	 */
	size = bp->len / icnt;

	for (i=0; i<icnt; i++) {
		ip = &bp->iov[i];

		ip->iov_base = bp->offset + (i * size);
		ip->iov_len  = size;
	}

	/* Fiddle the last one to make sure we cover all the data */
	ip->iov_len += (bp->len - (size * bp->icnt));

	for (i=0; i<icnt; i++) {
		ip = &bp->iov[i];

		sprintf(msg, "  iov[%d]  0x%lX  %d", i, 
			(unsigned long)ip->iov_base, (int)ip->iov_len);
		do_log(4, msg);
	}

	return(bp);
}

static uint64_t tvsub(struct timeval *after, struct timeval *before)
{
	uint64_t sec, usec;

	sec  = (uint64_t)(after->tv_sec - before->tv_sec);

	if (after->tv_sec < before->tv_sec) {
		sec--;
		usec = (uint64_t)(1000000 + after->tv_usec -
before->tv_usec);
	} else
		usec = (uint64_t)(after->tv_usec - before->tv_usec);

	usec += 1000000 * sec;

	return(usec);
}

static char *outfmt(double b)
{
	static char obuf[50];
	char prefix;

	if (!opt_bytes)
		b *= 8;

	prefix = ' ';
	if (b < 1024.0)
		goto out;

	prefix = 'K';
	b = b / 1024.0;
	if (b < 1024.0)
		goto out;

	prefix = 'M';
	b = b / 1024.0;
	if (b < 1024.0)
		goto out;

	prefix = 'G';
	b = b / 1024.0;

out:
	if (opt_bytes)
		sprintf(obuf, "%.2f %cB", b, prefix);
	else
		sprintf(obuf, "%.2f %cbit", b, prefix);

	return(obuf);
}



static void summary(uint64_t calls, uint64_t buffs, uint64_t bytes)
{
	uint64_t us_user, us_wall, us_sys;
	double   realt;

	us_wall = tvsub(&out_time, &in_time);
	us_user = tvsub(&out_usage.ru_utime, &in_usage.ru_utime);
	us_sys  = tvsub(&out_usage.ru_stime, &in_usage.ru_stime);

	realt = ((double)us_wall)/1000000;
	if (realt == 0.0)
		realt = 0.001;		/* No division by zero here */

	if (warn) {
		printf("%lu bytes in %.2f seconds = %s/sec\n",
			bytes,
			realt,
			outfmt((double)(bytes / realt)));

		printf("%lu I/O calls, usec/call = %.2f, calls/sec =
%.2f\n",
			calls,
			1000000.0 * realt/((double)calls),
			((double)calls/realt));

		printf("user: %lu sys: %lu total: %lu real: %lu\n",
			us_user,
			us_sys,
			(us_user + us_sys),
			us_wall);
	}

	/* Add an eye catching value */
	lba += sprintf(lba, "%s/sec", outfmt(((double)bytes)/realt));

	lba += sprintf(lba, "\", %lu, %lu, %lu, %lu, %lu, %lu",
		bytes, calls, buffs, us_wall, us_user, us_sys);

	printf("%s\n", label);
}


static void setup_time(void)
{
	int rc;
	struct itimerval alrm_timer;

	if (num_sec > 0) {
		lba += sprintf(lba, "+N ");

		signal(SIGALRM, sig_time);

		memset(&alrm_timer, 0, sizeof(alrm_timer));
		alrm_timer.it_interval.tv_sec = num_sec;
		alrm_timer.it_interval.tv_usec = 0;
		alrm_timer.it_value.tv_sec = num_sec;
		alrm_timer.it_value.tv_usec = 0;

		rc = setitimer(ITIMER_REAL, &alrm_timer, NULL);
		if (rc)
			die_error("unable to set timer");
	} else
		lba += sprintf(lba, "+n ");

	/* Catch when the other side goes away */
	signal(SIGPIPE, sig_pipe);
}


static void setup_io(void)
{
	int i, rc, log_opt_cnt;
	char msg[200];

	if (!opt_cnt) {
		log_opt_cnt = 0;
		opt_cnt = num_conc;
	} else
		log_opt_cnt = 1;

	/* Calculate the buffer size needed based on -l, -O, -A */
	if ((buf_align / 2) * 2 != buf_align) 
		die_error("-A (buffer alignment) must be a positive
power of 2");

	sprintf(msg, "buffer count %d, size %d, offset %d, send length
%d",
		num_conc, buf_align, buf_off, buf_len);
	do_log(3, msg);

	lba += sprintf(lba, "%d, %d, %d, %d, %d, %d, %d, %d, \"",
	               buf_len, buf_off, buf_align, num_conc,
		       num_sec, num_cnt, opt_iovec, opt_cnt);
	lba += log_opt_cnt ? sprintf(lba, "+I ") : sprintf(lba, "-I ");
	lba += opt_ver ? sprintf(lba, "+v ") : sprintf(lba, "-v ");

	data = (struct buff_ptr **)calloc(opt_cnt, sizeof(struct
buff_ptr *));
	if (!data)
		die_error("Unable to allocate buffer pointer memory");

	for (i=0; i<opt_cnt; i++) 
		data[i] = new_b(i, buf_len, buf_off, buf_align,
opt_iovec);

	events = calloc(num_conc * 2, sizeof(struct io_event));
	if (!events)
		die_error("Unable to allocate IO events array");

	iocbs = malloc(num_conc * sizeof(struct iocb *));
	if (!iocbs)
		die_error("Unable to allocate IO CBs");
	for (i=0; i<num_conc; i++) {
		iocbs[i] = (struct iocb *)malloc(sizeof(struct iocb));
		if (!iocbs[i])
			die_error("Unable to allocate an IOCB");
	}

	rc = io_setup(num_conc + 1, &io_ctx);
	if (rc)
		die_error("unable to setup async IO");
}

static void setup_socket(char *target)
{
	int i, rc;
	char msg[200];
	int optval;
	struct hostent *addr;

	/* Verify TX/RX mode vs. options */
	if (mode_rx) {
		if (opt_nodel)
			die_error("-D (TCP_NODELAY) only works on send
side");
		if (num_sec)
			die_error("-N (num secs) only works on send
side");

		/* Receiver will only die when sender goes away */
		num_cnt = 0;
		num_sec = 0;
	} else {
		if (opt_reuse)
			die_error("-R (SO_REUSEADDR) only works on
receive");
		if (opt_rlow != -1)
			die_error("-L (receive low water mark) only on
receive");
	}

	/* Open the socket and set options requested */
	fd = socket(domain, SOCK_STREAM, 0);
	if (fd < 0)
		die_error("Unable to open socket");
	lba += (domain == AF_INET_SDP) ? sprintf(lba, "+S ") :
sprintf(lba, "-S ");

	if (opt_reuse) {
		optval = 1;
		rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
		if (rc < 0)
			die_error("unable to set SO_REUSEADDR");
	}
	lba += opt_reuse ? sprintf(lba, "+R ") : sprintf(lba, "-R ");

	if (opt_sbuf >= 0) {
		rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt_sbuf,
sizeof(opt_sbuf));
		if (rc < 0)
			die_error("unable to set SO_SNDBUF");
	}
	lba += (opt_sbuf >= 0) ? sprintf(lba, "+b ") : sprintf(lba, "-b
");

	if (opt_rbuf >= 0) {
		rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt_rbuf,
sizeof(opt_rbuf));
		if (rc < 0)
			die_error("unable to set SO_RCVBUF");
	}

	if (opt_rlow >= 0) {
		rc = setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, &opt_rlow,
sizeof(opt_rlow));
		if (rc < 0)
			die_error("unable to set SO_RCVLOWAT");
	}

	if (opt_dbg) {
		optval = 1;
		rc = setsockopt(fd, SOL_SOCKET, SO_DEBUG, &optval,
sizeof(optval));
		if (rc < 0)
			die_error("unable to set SO_DEBUG");
	}
	lba += opt_dbg ? sprintf(lba, "+d ") : sprintf(lba, "-d ");

	if (opt_lingr) {
		struct linger linger;

		memset(&linger, 0, sizeof(linger));
		linger.l_onoff = 1;	/* Wait for all data to go */
		linger.l_linger = 5;	/* Wait for it all */

		rc = setsockopt(fd, SOL_SOCKET, SO_LINGER, (char
*)&linger, sizeof(linger)); 
		if (rc < 0)
			die_error("unable to set SO_LINGER");
	}
	lba += opt_lingr ? sprintf(lba, "+G ") : sprintf(lba, "-G ");

	if (opt_nodel) {
		optval = 1;
		rc = setsockopt(fd, SOL_TCP, TCP_NODELAY, &optval,
sizeof(optval));
		if (rc < 0)
			die_error("unable to set TCP_NODELAY");
	}
	lba += opt_nodel ? sprintf(lba, "+D ") : sprintf(lba, "-D ");

	if (opt_iovec) {
		if (opt_iovec > UIO_MAXIOV)
			die_error("more than UIO_MAXIOV requested");
	}

	memset(&s_in, 0, sizeof(s_in));
	s_in.sin_port   = htons(port);

	if (mode_rx) {
		rc = bind(fd, (struct sockaddr *)&s_in, sizeof(s_in));
		if (rc < 0)
			die_error("unable to bind");

		rc = listen(fd, 1);
		if (rc < 0)
			die_error("unable to listen");

		i  = sizeof(s_in);
		fd = accept(fd, (struct sockaddr *)&s_in, (socklen_t
*)&i);
		if (fd < 0)
			die_error("unable to accept");

		sprintf(msg, "Accepted connection from %s",
inet_ntoa(s_in.sin_addr));
	} else {
		if (atoi(target) > 0)
			s_in.sin_addr.s_addr = inet_addr(target);
		else {
			addr = gethostbyname(target);
			if (!addr)
				die_error("unable to resolve target
host");

			memcpy((char *)&s_in.sin_addr.s_addr, (char
*)addr->h_addr,
			       sizeof(s_in.sin_addr.s_addr));
		}

		s_in.sin_family = AF_INET;

		rc = connect(fd, (struct sockaddr *)&s_in,
sizeof(s_in));
		if (rc < 0)
			die_error("unable to connect to target");

		sprintf(msg, "Connected to %s", target);
		}
	do_log(1, msg);
}

static inline int do_norm(void)
{
	int i, rc;
	struct iocb *ip;
	struct buff_ptr *bp;
	struct app_hdr *hp;

	for (i=0; i<num_conc; i++) {
		ip = iocbs[i]; 
		bp = data[cur_data++];

		if (cur_data >= opt_cnt)
			cur_data = 0;

		if (mode_rx)
			io_prep_pread(ip, fd, bp->offset, bp->len, 0);
		else
			io_prep_pwrite(ip, fd, bp->offset, bp->len, 0);
		ip->data = bp;

		if (opt_ver) {
			hp = (struct app_hdr *)bp;
			hp->ip = (uint64_t)ip;
		}
	}

	rc = io_submit(io_ctx, num_conc, iocbs);
	if (rc != num_conc) {
		if (rc > 0) {
			printf("Submitted %d, accepted %d\n", num_conc,
rc);
			perror("io_submit");
			die_error("not all normal io_submit elements
accepted");
		}
		else if (rc == 0)
			die_error("no normal io_submit elements
accepted");
		else
			die_error("error on normal io_submit");
	}

	return(num_conc);
}


static inline int do_iov(void)
{
	int i, rc;
	struct iocb *ip;
	struct buff_ptr *bp;
	struct app_hdr *hp;

	for (i=0; i<num_conc; i++) {
		ip = iocbs[i]; 
		bp = data[cur_data++];

		if (cur_data >= opt_cnt)
			cur_data = 0;

		if (mode_rx)
			io_prep_preadv(ip, fd, bp->iov, bp->icnt, 0);
		else
			io_prep_pwritev(ip, fd, bp->iov, bp->icnt, 0);
		ip->data = bp;

		if (opt_ver) {
			hp = (struct app_hdr *)bp;
			hp->ip = (uint64_t)ip;
		}
	}

	rc = io_submit(io_ctx, num_conc, iocbs);
	if (rc != num_conc) {
		if (rc > 0)
			die_error("not normal all io_submit elements
accepted");
		else if (rc == 0)
			die_error("no normal io_submit elements
accepted");
		else {
			errno = -rc;
			die_error("error on normal io_submit");
		}
	}

	return(num_conc);
}


static inline void verify_rx(int i)
{
	/* TODO: Check the data */
	return;
}


int main(int argc, char *argv[])
{
	int i, rc;
	char *target;
	uint64_t cnt_calls, cnt_buffs, cnt_bytes, cnt_bytes_op;

	name = argv[0];

	while (1) {
		i = getopt(argc, argv,
"drtvRDSGb:l:N:n:p:A:O:a:x:L:I:w:");
		if (i < 0)
			break;

		switch (i) {

		case 't': mode_rx=0; break;
		case 'r': mode_rx=1; break;

		case 'd': opt_dbg = 1; break;
		case 'D': opt_nodel = 1; break;
		case 'R': opt_reuse = 1; break;
		case 'v': opt_ver = 1; break;
		case 'B': opt_bytes = 1; break;
		case 'G': opt_lingr = 1; break;
		case 'S': domain = AF_INET_SDP; break;

		case 'p': port = atoi(optarg); break;
		case 'n': num_cnt = atoi(optarg); num_sec = 0; break;
		case 'N': num_sec = atoi(optarg); num_cnt = 0; break;
		case 'O': buf_off = atoi(optarg); break;
		case 'A': buf_align = atoi(optarg); break;
		case 'b': opt_sbuf = opt_rbuf = atoi(optarg); break;
		case 'L': opt_rlow = atoi(optarg); break;
		case 'a': num_conc = atoi(optarg); break;
		case 'I': opt_iovec = atoi(optarg); break;
		case 'x': opt_cnt = atoi(optarg); break;
		case 'w': warn = atoi(optarg); break;

		case 'l': 
			buf_len = atoi(optarg);
			if (buf_len < sizeof(struct app_hdr))
				buf_len = sizeof(struct app_hdr);
			break;

		default: die_usage();
		}
	}

	memset(label, 0, sizeof(label));

	if (mode_rx) {
		lba = label + sprintf(label, "RX, ");
		target = NULL;
	} else {
		lba = label + sprintf(label, "TX, ");
		if (optind == argc)
			die_usage();

		target = argv[optind];
	}

	setup_io();
	setup_socket(target);

	/* Move some data */
	cnt_buffs = 0;
	cnt_calls = 0;
	cnt_bytes = 0;

	setup_time();

	gettimeofday(&in_time, NULL);
	getrusage(RUSAGE_SELF, &in_usage);

	while (!done) {
		cnt_buffs += (opt_iovec) ? do_iov() : do_norm();
			
		rc = io_getevents(io_ctx, 1, num_conc, events, NULL);
		if (rc != num_conc) {
			if (rc > 0)
				do_log(5, "did not get them all");
			else if (rc == 0)
				die_error("No completions");
			else 
				die_error("Error reading completions");
		}

		cnt_calls++;
		cnt_buffs += num_conc;

		cnt_bytes_op = 0;
		for (i=0; i < rc && 0 < (long)events[i].res; i++) {
			cnt_bytes_op += events[i].res;

			if (!opt_ver)
				continue;

			if (mode_rx)
				verify_rx(i);
		}

		if (cnt_bytes_op)
			cnt_bytes += cnt_bytes_op;
		else
			break;

		if (opt_ver)
			if (cnt_bytes_op != num_conc * buf_len)
				die_error("Data size mismatch");
		
		if (num_cnt)
			if (cnt_calls >= num_cnt)
				break;
	}

	gettimeofday(&out_time, NULL);
	getrusage(RUSAGE_SELF, &out_usage);

	close(fd);

	/* There are really 2 system calls for every request */
	summary(2*cnt_calls, cnt_buffs, cnt_bytes);

	return 0;
}



More information about the general mailing list