[ewg] [PATCH 2/10] DAPL v2.0: common: new IB collective provider for Mellanox Fabric Collective Agent

Davis, Arlin R arlin.r.davis at intel.com
Wed Aug 10 17:41:45 PDT 2011


Support for bcast, barrier, reduce, allreduce, allgather, allgatherv

Signed-off-by: Arlin Davis <arlin.r.davis at intel.com>
---
 dapl/openib_common/collectives/fca_provider.c   | 1400 +++++++++++++++++++++++
 dapl/openib_common/collectives/fca_provider.h   |  100 ++
 dapl/openib_common/collectives/ib_collectives.h |  228 ++++
 3 files changed, 1728 insertions(+), 0 deletions(-)
 create mode 100644 dapl/openib_common/collectives/fca_provider.c
 create mode 100755 dapl/openib_common/collectives/fca_provider.h
 create mode 100755 dapl/openib_common/collectives/ib_collectives.h

diff --git a/dapl/openib_common/collectives/fca_provider.c b/dapl/openib_common/collectives/fca_provider.c
new file mode 100644
index 0000000..3fe4724
--- /dev/null
+++ b/dapl/openib_common/collectives/fca_provider.c
@@ -0,0 +1,1400 @@
+/*
+ * Copyright (c) 2011 Intel Corporation.  All rights reserved.
+ * 
+ * This Software is licensed under one of the following licenses:
+ * 
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ *    in the file LICENSE.txt in the root directory. The license is also
+ *    available from the Open Source Initiative, see 
+ *    http://www.opensource.org/licenses/cpl.php.
+ * 
+ * 2) under the terms of the "The BSD License" a copy of which is in the file
+ *    LICENSE2.txt in the root directory. The license is also available from
+ *    the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/bsd-license.php.
+ * 
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is in the file LICENSE3.txt in the root directory. The
+ *    license is also available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ * 
+ * Licensee has the right to choose one of the above licenses.
+ * 
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ * 
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+
+/*
+ * Mellanox ConnectX-2 MPI collective offload support - FCA (Fabric Collective Agent)
+ */
+
+#include <dlfcn.h>
+#include "openib_osd.h"
+#include "dapl.h"
+#include "dapl_adapter_util.h"
+#include "dapl_evd_util.h"
+#include "dapl_ib_util.h"
+#include "dapl_ep_util.h"
+#include "dapl_evd_util.h"
+#include "dapl_cookie.h"
+
+#ifdef DAT_IB_COLLECTIVES
+#ifdef DAT_FCA_PROVIDER
+
+#include <dat2/dat_ib_extensions.h>
+#include <collectives/ib_collectives.h>
+
+#define DAT_COLL_SID 0x2234
+
+static char *fca_specfile = "/tmp/fca_spec.ini";
+void *fca_lhandle = NULL;
+
+static struct grp_req {
+        int id;
+        int sockfd;
+        struct grp_req *next;
+} *qhead = NULL, *qtail=NULL;
+
+static int grp_req_queue(int id, int sockfd)
+{
+        struct grp_req *p;
+
+        p = malloc(sizeof *p);
+        if (p==NULL)
+                return -ENOMEM;
+
+        p->id = id;
+        p->sockfd = sockfd;
+        p->next = NULL;
+        if (qtail) {
+                qtail->next = p;
+                qtail = p;
+        }
+        else
+                qhead = qtail = p;
+
+        return 0;
+}
+
+static int grp_req_dequeue(int id)
+{
+        struct grp_req *p, *q;
+        int sockfd = -1;
+
+        p = qhead;
+        q = NULL;
+
+        while (p) {
+                if (p->id == id) {
+                        sockfd = p->sockfd;
+                        if (q)
+                                q->next = p->next;
+                        else
+                                qhead = p->next;
+
+                        if (p == qtail)
+                                qtail = q;
+
+                        free(p);
+                        break;
+                }
+                q = p;
+                p = p->next;
+        }
+        return sockfd;
+}
+
+static int fca_dtype( enum dat_ib_collective_data_type type )
+{
+	int fca_type;
+
+	switch (type) {
+	  case DAT_IB_COLLECTIVE_TYPE_INT8:
+		fca_type = FCA_DTYPE_CHAR;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_UINT8:
+		fca_type = FCA_DTYPE_UNSIGNED_CHAR;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT16:
+		fca_type = FCA_DTYPE_SHORT;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_UINT16:
+		fca_type = FCA_DTYPE_UNSIGNED_SHORT;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT32:
+		fca_type = FCA_DTYPE_INT;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_UINT32:
+		fca_type = FCA_DTYPE_UNSIGNED;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT64:
+		fca_type = FCA_DTYPE_LONG;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_UINT64:
+		fca_type = FCA_DTYPE_UNSIGNED_LONG;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_FLOAT:
+		fca_type = FCA_DTYPE_FLOAT;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_DOUBLE:
+		fca_type = FCA_DTYPE_DOUBLE;
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_LONG_DOUBLE:
+		/* no mapping to 128-bit quadruple precision */
+	  default:
+		fca_type = FCA_DTYPE_LAST+1; /* unsupported */
+		break;
+        }
+	return fca_type;
+}
+
+static int fca_dsize( enum dat_ib_collective_data_type type )
+{
+	int type_size;
+
+	switch (type) {
+	  case DAT_IB_COLLECTIVE_TYPE_INT8:
+	  case DAT_IB_COLLECTIVE_TYPE_UINT8:
+		type_size = sizeof(uint8_t);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT16:
+	  case DAT_IB_COLLECTIVE_TYPE_UINT16:
+		type_size = sizeof(uint16_t);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT32:
+	  case DAT_IB_COLLECTIVE_TYPE_UINT32:
+		type_size = sizeof(uint32_t);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_INT64:
+	  case DAT_IB_COLLECTIVE_TYPE_UINT64:
+		type_size = sizeof(uint64_t);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_FLOAT:
+		type_size = sizeof(float);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_DOUBLE:
+		type_size = sizeof(double);
+		break;
+	  case DAT_IB_COLLECTIVE_TYPE_LONG_DOUBLE:
+		type_size = sizeof(long double);
+		break;
+	  default:
+		type_size = 0;
+		break;
+        }
+
+	return type_size;
+}
+static int fca_op( enum dat_ib_collective_reduce_data_op op )
+{
+	int fop = 0;
+
+	switch (op) {
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_MAX:   fop = FCA_OP_MAX; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_MIN:   fop = FCA_OP_MIN; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_SUM:	  fop = FCA_OP_SUM; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_PROD:  fop = FCA_OP_PROD; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_LAND:  fop = FCA_OP_LAND; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_BAND:  fop = FCA_OP_BAND; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_LOR:   fop = FCA_OP_LOR; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_BOR:   fop = FCA_OP_BOR; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_LXOR:  fop = FCA_OP_LXOR; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_BXOR:  fop = FCA_OP_BXOR; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_MAXLOC:fop = FCA_OP_MAXLOC; break;
+	  case DAT_IB_COLLECTIVE_REDUCE_OP_MINLOC:fop = FCA_OP_MINLOC; break;
+        }
+	return fop;
+}
+
+/* Progress function for consumer
+ * Will be called from FCA collective operation context
+ * periodically if FCA blocks there for too long.
+ * Don't call with scheduled non-blocking operations
+ */
+void my_progress(void *arg)
+{
+	ib_hca_transport_t *tp = (ib_hca_transport_t *) arg;
+
+	if ((tp->user_func) && (tp->t_id != dapl_os_gettid()) ) {
+		dapl_log(DAPL_DBG_TYPE_THREAD, "calling progress_func(%p)\n",tp);
+		(*tp->user_func)();
+	}
+}
+
+/* forward prototypes */
+DAT_RETURN dapli_free_collective_member(IN DAT_IA_HANDLE ia,
+					IN DAT_IB_COLLECTIVE_MEMBER member);
+
+/******************* Internal Collective Calls **************************/
+
+static int create_service(struct dapl_hca *hca)
+{
+	ib_hca_transport_t *tp = &hca->ib_trans;
+	struct fca_init_spec *fca_spec;
+	struct fca_context *ctx;
+	FILE *fp;
+	int ret;
+
+	/* create an empty spec file if it does not exist */
+	fp = fopen(fca_specfile, "r");
+	if (fp==NULL)
+		fp = fopen(fca_specfile, "w");
+	if (fp)
+		fclose(fp);
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION, "create_service: enter(%p)\n", tp);
+
+	/* Read INI file into global structures before setting any spec */
+	fca_spec = fca_parse_spec_file(fca_specfile);
+	if (fca_spec == NULL)
+		return 1;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION, "  fca_init_spec\n");
+	fca_spec->element_type = FCA_ELEMENT_RANK;
+        fca_spec->job_id = 0;
+        fca_spec->rank_id = 0;
+ 	fca_spec->progress.func = my_progress;
+	fca_spec->progress.arg = tp;
+	if ((ret = fca_init(fca_spec, &ctx)))
+		return 1;
+
+	fca_free_init_spec(fca_spec);
+	tp->m_ctx = ctx;
+
+	return 0;
+}
+
+static int create_member(struct dapl_hca *hca)
+{
+	ib_hca_transport_t *tp = &hca->ib_trans;
+	int size, ret = EFAULT;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " create_member: tp=%p, ctx=%p\n", tp, tp->m_ctx);
+
+	if (!tp->m_ctx)
+		goto bail;
+
+	/* FCA address information */
+	tp->f_info = fca_get_rank_info(tp->m_ctx, &size);
+	if (!tp->f_info) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			"create_member: fca_get_rank_info() ERR ret=%s ctx=%p\n",
+			strerror(errno), tp->m_ctx);
+		ret = errno;
+		goto err;
+	}
+
+	tp->m_info = malloc(sizeof(DAT_SOCK_ADDR) + size);
+	if (!tp->m_info) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			"create_member: malloc() ERR ret=%s ctx=%p\n",
+			strerror(errno), tp->m_ctx);
+		fca_free_rank_info(tp->f_info);
+		goto err;
+	}
+	dapl_os_memzero(tp->m_info, sizeof(DAT_SOCK_ADDR) + size);
+
+	if ((tp->l_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			"create_member: socket() ERR ret=%s \n",
+			strerror(errno));
+		ret = errno;
+		goto err;
+	}
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION, " create_member listen socket\n");
+
+	/*
+	 * only rank0 needs listen, but we don't know who is rank0 yet.
+	 * Everyone listen, start on seed port until find one unused
+	 */
+	memcpy((void*)&tp->m_addr, (void*)&hca->hca_address, sizeof(DAT_SOCK_ADDR));
+	tp->m_addr.sin_port = htons(DAT_COLL_SID-1);
+
+	do {
+		tp->m_addr.sin_port++;
+		ret = bind(tp->l_sock,
+			   (struct sockaddr *)&tp->m_addr,
+			   sizeof(DAT_SOCK_ADDR));
+
+	} while (ret == -1 && errno == EADDRINUSE);
+
+	if (ret == -1)
+		goto err;
+
+	if ((ret = listen(tp->l_sock, 1024)) < 0)
+		goto err;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		"create_member: listen port 0x%x,%d \n",
+		ntohs(tp->m_addr.sin_port),
+		ntohs(tp->m_addr.sin_port));
+
+	/* local fca_info and sock_addr to member buffer for MPI exchange */
+	tp->f_size = size;
+	tp->m_size = size + sizeof(DAT_SOCK_ADDR);
+	memcpy(tp->m_info, tp->f_info, size);
+	memcpy( ((char*)tp->m_info + size), &tp->m_addr, sizeof(DAT_SOCK_ADDR));
+
+	/* free rank info after getting */
+	fca_free_rank_info(tp->f_info);
+	tp->f_info = NULL;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 "create_member: m_ptr=%p, sz=%d exit SUCCESS\n",
+		 tp->m_info, tp->m_size);
+
+	return 0;
+err:
+	/* cleanup */
+	if (tp->f_info) {
+		fca_free_rank_info(tp->f_info);
+		tp->f_info = NULL;
+	}
+
+	if (tp->m_info) {
+		free(tp->m_info);
+		tp->m_info = NULL;
+	}
+	if (tp->l_sock > 0)
+		close(tp->l_sock);
+bail:
+	return 1;
+}
+
+static void create_group(struct coll_group *group)
+{
+ 	int *conn = group->conn;
+	int i, g_id, ret = 0;
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " create_grp[%d]: group=%p, id=%d\n",
+		 group->self, group, group->id);
+
+	/* group creation event */
+	eventx.status = DAT_IB_COLL_COMP_ERR;
+	eventx.type = DAT_IB_COLLECTIVE_CREATE_DATA;
+	eventx.coll.handle = NULL;
+	eventx.coll.context = group->user_context;
+
+	/* Create and distribute group info and close connections*/
+	if (group->self == 0) {
+
+		/* accept and send all ranks comm_desc info */
+		for (i = 1; i < group->ranks; ) {
+			/* check for queue'd group id request */
+			conn[i] = grp_req_dequeue(group->id);
+			if (conn[i] < 0) {
+				conn[i] = accept(group->tp->l_sock, NULL, NULL);
+				if (conn[i] < 0)
+					goto error;
+
+				/* Validate ID from ranks, all ranks have comm_desc */
+				ret = recv(conn[i], &g_id, sizeof(g_id), 0);
+				if ((ret < 0) || (ret != sizeof(g_id))) {
+					dapl_log(DAPL_DBG_TYPE_ERR,
+						 " create_grp[0]: rcv g_id ERR:\n");
+					goto error;
+				}
+				/* no match, queue it for other response */
+				if (g_id != group->id) {
+					dapl_log(DAPL_DBG_TYPE_WARN,
+						 " create_grp[0]:"
+						 " rcv g_id %d != g_id %d\n",
+						 g_id, group->id);
+					grp_req_queue(g_id, conn[i]);
+					continue; /* try conn[i] again */
+				}
+				dapl_log(DAPL_DBG_TYPE_EXTENSION,
+					 " create_grp[0]: rcv g_id %d == g_id %d\n",
+					 g_id, group->id);
+			}
+
+			/* Group match, send back FCA comm_desc information */
+			ret = send(conn[i], &group->comm_desc, sizeof(group->comm_desc), 0);
+			if (ret < 0) {
+				dapl_log(DAPL_DBG_TYPE_ERR,
+					 " create_grp[0]: snd %d comm: ERR:\n", i);
+				goto error;
+			}
+			i++; /* next rank */
+		}
+
+		/* all have comm_desc, close all sockets */
+		for (i = 1; i < group->ranks; ++i)
+			close(conn[i]);
+
+	} else {
+
+		/* first group addr_info entry is rank 0 */
+		dapl_log(DAPL_DBG_TYPE_EXTENSION,
+			 " create_grp[%d]: connect -> %s 0x%x \n",
+			 group->self, inet_ntoa(group->addr_info->sin_addr),
+			 ntohs(group->addr_info->sin_port));
+
+		group->sock = socket(AF_INET, SOCK_STREAM, 0);
+		if (group->sock < 0) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 " create_grp: socket() ERR: %s\n",
+				 strerror(errno));
+			goto error;
+		}
+		ret = connect(group->sock,
+			      (struct sockaddr *)group->addr_info,
+			      sizeof(*group->addr_info));
+		if (ret < 0) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 " create_grp: connect() ERR: %s\n",
+				 strerror(errno));
+			goto error;
+		}
+		/* send group ID to identify with multiple groups */
+		ret = send(group->sock, &group->id, sizeof(group->id), 0);
+		if (ret < 0) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 " create_grp: snd() ERR: %s g_id=\n",
+				 strerror(errno), group->id);
+			goto error;
+		}
+
+		/* recv FCA comm_desc for this group ID */
+		ret = recv(group->sock, &group->comm_desc, sizeof(group->comm_desc), 0);
+		if ((ret < 0) || (ret != sizeof(group->comm_desc))) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 " create_grp: recv() ERR: %s \n",
+				 strerror(errno));
+			goto error;
+		}
+
+		/* cleanup socket resources */
+		close(group->sock);
+		group->sock = 0;
+	}
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		" create_grp[%d]: fca_comm_init_spec() ranks=%d comm_id=0x%04x"
+		" job_id=0x%lx m_type %d grp_id=%d\n",
+		group->self, group->ranks, group->comm_desc.comm_id,
+		group->comm_desc.job_id, group->comm_desc.comm_maddr.type,
+		group->id);
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		" create_grp[%d]: fca_comm_init_spec() m_addr -> "
+		"%02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x "
+		"%02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x\n",
+		group->self,group->comm_desc.comm_maddr.data[0],
+		group->comm_desc.comm_maddr.data[1],
+		group->comm_desc.comm_maddr.data[2],group->comm_desc.comm_maddr.data[3],
+		group->comm_desc.comm_maddr.data[4],group->comm_desc.comm_maddr.data[5],
+		group->comm_desc.comm_maddr.data[6],group->comm_desc.comm_maddr.data[7],
+		group->comm_desc.comm_maddr.data[8],group->comm_desc.comm_maddr.data[9],
+		group->comm_desc.comm_maddr.data[10],group->comm_desc.comm_maddr.data[11],
+		group->comm_desc.comm_maddr.data[12],group->comm_desc.comm_maddr.data[13],
+		group->comm_desc.comm_maddr.data[14],group->comm_desc.comm_maddr.data[15],
+		group->comm_desc.comm_maddr.data[16],group->comm_desc.comm_maddr.data[17],
+		group->comm_desc.comm_maddr.data[18],group->comm_desc.comm_maddr.data[19],
+		group->comm_desc.comm_maddr.data[20],group->comm_desc.comm_maddr.data[21],
+		group->comm_desc.comm_maddr.data[22],group->comm_desc.comm_maddr.data[23],
+		group->comm_desc.comm_maddr.data[24],group->comm_desc.comm_maddr.data[25],
+		group->comm_desc.comm_maddr.data[26],group->comm_desc.comm_maddr.data[27],
+		group->comm_desc.comm_maddr.data[28],group->comm_desc.comm_maddr.data[29],
+		group->comm_desc.comm_maddr.data[30],group->comm_desc.comm_maddr.data[31]);
+
+	/* init communicator, node p_idx and procs, total ranks, all ranks */
+	group->comm_init.desc = group->comm_desc;
+	group->comm_init.rank = group->self;
+	group->comm_init.size = group->ranks;
+	group->comm_init.proc_idx = group->g_info.local_rank;
+	group->comm_init.num_procs = group->g_info.local_size;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		" create_grp[%d]: fca_comm_init() ranks=%d local_rank=%d, local_size%d\n",
+		group->self, group->ranks, group->g_info.local_rank, group->g_info.local_size);
+
+	if (fca_comm_init(group->ctx, &group->comm_init, &group->comm)) {
+		dapl_log(DAPL_DBG_TYPE_ERR,
+			 " create_grp: fca_comm_init() ERR: %s",
+			 strerror(errno));
+		goto error;
+	}
+	fca_comm_get_caps(group->comm, &group->comm_caps);
+
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+
+error:
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+	if (eventx.status != DAT_OP_SUCCESS)
+		dapli_free_collective_group((DAT_IB_COLLECTIVE_HANDLE)group);
+
+	return;
+}
+
+/* worker thread to support non-blocking group creations and operations  */
+static void coll_thread(void *arg)
+{
+	struct coll_group *grp, *next;
+	struct dapl_hca *hca = (struct dapl_hca*)arg;
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_os_lock(&tp->coll_lock);
+	tp->coll_thread_state = IB_THREAD_RUN;
+	tp->t_id = dapl_os_gettid();
+
+	if (create_service(hca))
+		goto err;
+
+	if (create_member(hca))
+		goto err;
+
+	while (tp->coll_thread_state == IB_THREAD_RUN) {
+
+		dapl_os_unlock(&tp->coll_lock);
+		dapl_os_wait_object_wait(&tp->coll_event,
+					 DAT_TIMEOUT_INFINITE);
+
+		if (!dapl_llist_is_empty(&tp->grp_list))
+			next = dapl_llist_peek_head(&tp->grp_list);
+		else
+			next = NULL;
+
+		while (next) {
+			grp = next;
+			create_group(grp);
+
+			next = dapl_llist_next_entry(&tp->grp_list,
+						    (DAPL_LLIST_ENTRY *)
+						    &grp->list_entry);
+			dapl_llist_remove_entry(&tp->grp_list,
+						(DAPL_LLIST_ENTRY *)
+						&grp->list_entry);
+		}
+		dapl_os_lock(&tp->coll_lock);
+	}
+err:
+	tp->coll_thread_state = IB_THREAD_EXIT;
+	dapl_os_unlock(&tp->coll_lock);
+}
+
+static DAT_RETURN coll_thread_init(struct dapl_hca *hca)
+{
+	DAT_RETURN dat_status;
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_os_lock(&tp->coll_lock);
+	if (tp->coll_thread_state != IB_THREAD_INIT) {
+		dapl_os_unlock(&tp->coll_lock);
+		return DAT_SUCCESS;
+	}
+	tp->coll_thread_state = IB_THREAD_CREATE;
+	dapl_os_unlock(&tp->coll_lock);
+
+	/* thread to process group comm creation */
+	dat_status = dapl_os_thread_create(coll_thread, (void*)hca, &tp->coll_thread);
+	if (dat_status != DAT_SUCCESS)
+		return (dapl_convert_errno(errno,
+					   "create_coll_thread ERR:"
+					   " check resource limits"));
+	/* wait for thread to start */
+	dapl_os_lock(&tp->coll_lock);
+	while (tp->coll_thread_state != IB_THREAD_RUN) {
+		dapl_os_unlock(&tp->coll_lock);
+		dapl_os_sleep_usec(2000);
+		dapl_os_lock(&tp->coll_lock);
+	}
+	dapl_os_unlock(&tp->coll_lock);
+
+	return DAT_SUCCESS;
+}
+
+static void coll_thread_destroy(struct dapl_hca *hca)
+{
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_os_lock(&tp->coll_lock);
+	if (tp->coll_thread_state != IB_THREAD_RUN)
+		goto bail;
+
+	tp->coll_thread_state = IB_THREAD_CANCEL;
+	while (tp->coll_thread_state != IB_THREAD_EXIT) {
+		dapl_os_wait_object_wakeup(&tp->coll_event);
+		dapl_os_unlock(&tp->coll_lock);
+		dapl_os_sleep_usec(2000);
+		dapl_os_lock(&tp->coll_lock);
+	}
+bail:
+	dapl_os_unlock(&tp->coll_lock);
+}
+
+/******************* External Collective Calls **************************/
+
+/* Create context for FCA, get adapter and port from hca_ptr */
+int dapli_create_collective_service(IN struct dapl_hca *hca)
+{
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_os_lock_init(&tp->coll_lock);
+	dapl_llist_init_head(&tp->grp_list);
+	dapl_os_wait_object_init(&tp->coll_event);
+
+	/* non-blocking, FCA calls in work thread */
+	if (coll_thread_init(hca))
+		return 1;
+
+	return 0;
+}
+
+void dapli_free_collective_service(IN struct dapl_hca *hca)
+{
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	if (tp->m_ctx) {
+		fca_cleanup(tp->m_ctx);
+		tp->m_ctx = NULL;
+	}
+
+	coll_thread_destroy(hca);
+	dapl_os_wait_object_destroy(&tp->coll_event);
+}
+
+DAT_RETURN
+dapli_create_collective_member( IN  DAT_IA_HANDLE ia,
+				IN  void *progress_func,
+				OUT DAT_COUNT *member_size,
+				OUT DAT_IB_COLLECTIVE_MEMBER *member )
+{
+	struct dapl_hca *hca = ((DAPL_IA*)ia)->hca_ptr;
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		"create_member: hca=%p, psz=%p pmem=%p tp=%p, ctx=%p\n",
+		hca, member_size, member, tp, tp->m_ctx);
+
+	if (!tp->m_ctx)
+		return DAT_INVALID_PARAMETER;
+
+	/* copy out member info, initialized in create_service */
+	*member_size = tp->m_size;
+	*member = tp->m_info;
+
+	/* set the progress function, called during long offload delays */
+	tp->user_func = progress_func;
+
+	return DAT_SUCCESS;
+}
+
+DAT_RETURN
+dapli_free_collective_member( IN DAT_IA_HANDLE ia,
+			      IN DAT_IB_COLLECTIVE_MEMBER member )
+{
+	struct dapl_hca *hca = ((DAPL_IA*)ia)->hca_ptr;
+	ib_hca_transport_t *tp = &hca->ib_trans;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		"free_member: enter hca=%p, member=%p \n",
+		hca, member);
+
+	if ((member == NULL) || (member != tp->m_info))
+		return DAT_INVALID_PARAMETER;
+
+	/* release FCA info */
+	if (tp->f_info) {
+		fca_free_rank_info(tp->f_info);
+		tp->f_info = NULL;
+	}
+
+	/* free member buffer */
+	if (tp->m_info) {
+		free(tp->m_info);
+		tp->m_info = NULL;
+	}
+
+	if (tp->l_sock > 0)
+		close(tp->l_sock);
+
+	return DAT_SUCCESS;
+}
+
+/*
+ * This asynchronous call initiates the process of creating a collective 
+ * group and must be called by all group members. The collective_group 
+ * argument points to an array of address/connection qualifier pairs that 
+ * identify the members of the group in rank order. The group_size argument 
+ * specifies the size of the group and therefore the size of the coll_group 
+ * array.  The self argument identifies the rank of the caller.  
+ * The group_id argument specifies a network-unique identifier for this 
+ * instance of the collective group.  All members of the group must specify 
+ * the same group_id value for the same collective instance. The evd_handle 
+ * argument specifies the EVD used for all asynchronous collective completions 
+ * including this call. The user_context argument will be returned in the 
+ * DAT_EXT_COLLECTIVE_CREATE_DATA event.
+ *
+ * On a successful completion, each group member will receive a 
+ * DAT_EXT_COLLECTIVE_CREATE_DATA event on the EVD specified by evd_handle. 
+ * The event contains the collective handle, the rank of the receiving 
+ * endpoint within the collective group, the size of the group, and the 
+ * caller specified user_context. The returned collective handle can be used 
+ * in network clock, multicast, and other collective operations.
+ *
+ * Multiple collective groups can be defined and an endpoint may belong 
+ * to more than one collective group. 
+ */
+DAT_RETURN
+dapli_create_collective_group(
+	IN  DAT_EVD_HANDLE		evd_handle,
+	IN  DAT_PZ_HANDLE		pz,
+	IN  DAT_IB_COLLECTIVE_MEMBER	*members,
+	IN  DAT_COUNT			ranks,
+	IN  DAT_IB_COLLECTIVE_RANK	self,
+	IN  DAT_IB_COLLECTIVE_ID	id,
+	IN  DAT_IB_COLLECTIVE_GROUP	*g_info,
+	IN  DAT_CONTEXT			user_ctx)
+{
+	DAPL_EVD *evd = (DAPL_EVD*)evd_handle;
+	DAPL_IA *ia;
+	ib_hca_transport_t *tp;
+	struct coll_group *group;
+	DAT_RETURN dat_status;
+	int i;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		" create_grp[%d]: enter evd=%p cq=%p pz=%p "
+		"m=%p *m=%p t_ranks=%d g_id=%d l_idx=%d l_ranks=%d\n",
+		self, evd, evd->ib_cq_handle, pz,
+		members, *members, ranks, id, g_info->local_rank,
+		g_info->local_size);
+
+	/* Validate EVD handle, extended flag MUST be set */
+	if (DAPL_BAD_HANDLE(evd, DAPL_MAGIC_EVD) || 
+	    DAPL_BAD_HANDLE(pz, DAPL_MAGIC_PZ))
+		return(dapl_convert_errno(EINVAL, " coll_grp"));
+
+	ia = (DAPL_IA*)evd->header.owner_ia;
+	tp = &ia->hca_ptr->ib_trans;
+
+	/* Allocate group object */
+	group = (struct coll_group *)dapl_os_alloc(sizeof(*group));
+	if (!group)
+		return(dapl_convert_errno(ENOMEM," create_grp"));
+	dapl_os_memzero(group, sizeof(*group));
+
+	/* Initialize the header and save group info, COLLECTIVE handle */
+	group->header.provider = ia->header.provider;
+	group->header.handle_type = DAT_IB_HANDLE_TYPE_COLLECTIVE;
+	group->header.magic = DAPL_MAGIC_COLL;
+	group->header.owner_ia = ia;
+	group->user_context = user_ctx;
+	group->evd = (DAPL_EVD*)evd;
+	group->pz = (DAPL_PZ*)pz;
+	group->ranks = ranks;
+	group->id = id;
+	group->self = self;
+	group->ctx = tp->m_ctx;
+	group->tp = tp;
+
+	/* Rank0 connected sockets for group, to exchange information */
+	if (self == 0) {
+		group->conn = (int *)dapl_os_alloc(ranks * sizeof(*group->conn));
+		if (group->conn == NULL)
+			return(dapl_convert_errno(ENOMEM," create_grp connections"));
+		dapl_os_memzero(group->conn, ranks * sizeof(*group->conn));
+	}
+
+	/* need FCA information in array for new comm group call */
+	group->fca_info = dapl_os_alloc(ranks * tp->f_size);
+	if (!group->fca_info ) {
+		dapl_os_free(group, sizeof(struct coll_group));
+		return(dapl_convert_errno(ENOMEM," create_grp fca_info"));
+	}
+	dapl_os_memzero(group->fca_info, ranks * tp->f_size);
+
+	/* need FCA information in array for new comm group call */
+	group->addr_info = (struct sockaddr_in*)dapl_os_alloc(ranks * sizeof(struct sockaddr_in));
+	if (!group->addr_info) {
+		dapl_os_free(group->fca_info, ranks * tp->f_size);
+		dapl_os_free(group, sizeof(struct coll_group));
+		return(dapl_convert_errno(ENOMEM," create_grp fca_info"));
+	}
+	dapl_os_memzero(group->addr_info, ranks * sizeof(struct sockaddr_in));
+	
+	/* Separate group member info into Address and FCA arrays */
+	for (i=0; i<ranks; i++) {
+		memcpy((void*) ((char*)group->fca_info + (i * tp->f_size)),
+		       (void*) *(members + i),
+		       tp->f_size);
+		memcpy((void*) ((char*)group->addr_info + (i * sizeof(struct sockaddr_in))),
+		       (void*) ((char*)(*(members + i)) + tp->f_size),
+		       sizeof(struct sockaddr_in));
+	}
+
+	/* Intranode and Internode process layout info */
+	group->g_info = *g_info;
+
+	if (group->self == 0) {
+		/* rank 0 - create new communicator */
+		group->comm_new.rank_info = group->fca_info;
+		group->comm_new.rank_count = group->ranks;
+		group->comm_new.is_comm_world = 0; /* FIX */
+
+		dapl_log(DAPL_DBG_TYPE_EXTENSION,
+			 " create_grp[%d]: calling comm_new..\n", group->self);
+
+		if (fca_comm_new(group->ctx, &group->comm_new, &group->comm_desc)) {
+			dapl_log(DAPL_DBG_TYPE_ERR,
+				 " create_grp: fca_comm_new() ERR: %s",
+				 strerror(errno));
+			dat_status = dapl_convert_errno(errno, " fca_comm_new");
+			goto error;
+		}
+	}
+
+	/* initialize all lists, events, etc */
+	dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&group->list_entry);
+	dapl_llist_init_head(&group->op_free);
+	dapl_llist_init_head(&group->op_pend);
+	dapl_os_wait_object_init(&group->op_event);
+
+	/* allocate object pool for non-blocking collective operations */
+	group->op_pool = (struct coll_op *)dapl_os_alloc(sizeof(struct coll_op) * COLL_OP_CNT);
+	if (!group->op_pool)
+		return(dapl_convert_errno(ENOMEM," create_grp"));
+	dapl_os_memzero(group->op_pool, sizeof(*group));
+
+	/* non-blocking, schedule on work thread */
+	dapl_os_lock(&tp->coll_lock);
+	dapl_llist_add_tail(&tp->grp_list, (DAPL_LLIST_ENTRY *)&group->list_entry, group);
+	dapl_os_unlock(&tp->coll_lock);
+	dapl_os_wait_object_wakeup(&tp->coll_event);
+
+	return DAT_SUCCESS;	
+error:
+	/* clean up partial group */
+	dapli_free_collective_group((DAT_IB_COLLECTIVE_HANDLE)group);
+	return(dat_status);
+};
+
+
+/* 
+ * This synchronous call destroys a previously created collective group 
+ * associated with the collective_handle argument.  Any pending or 
+ * in-process requests associated with the collective group will be 
+ * terminated and be posted to the appropriate EVD.
+ */
+DAT_RETURN
+dapli_free_collective_group(
+        IN DAT_IB_COLLECTIVE_HANDLE	coll_handle)
+{
+	struct coll_group *group = (struct coll_group *)coll_handle;
+
+	if (DAPL_BAD_HANDLE(coll_handle, DAPL_MAGIC_COLL))
+		return(dapl_convert_errno(EINVAL, " free_grp"));
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+                 " free_coll_group[%d]: pz=%p gp=%p complete!\n",
+		 group->self, group->pz, group );
+
+	/* reset magic and free memory */
+	group->header.magic = DAPL_MAGIC_INVALID;
+
+	/* free client socket resources */
+	if (group->sock)
+		close(group->sock);
+
+	if (group->conn)
+		dapl_os_free(group->conn,
+			     group->ranks *
+			     sizeof(*group->conn));
+
+	/* FCA and address info arrays */
+	if (group->fca_info)
+		dapl_os_free(group->fca_info,
+			     group->ranks *
+			     group->tp->m_size);
+	if (group->addr_info)
+		dapl_os_free(group->addr_info,
+			     group->ranks *
+			     group->tp->m_size);
+
+	fca_comm_destroy(group->comm);
+
+	if (group->self == 0)
+		fca_comm_end(group->ctx, group->comm_desc.comm_id);
+
+	dapl_os_free(group, sizeof(struct coll_group));
+
+	return DAT_SUCCESS;
+};
+
+/* 
+ * This call will synchronize all endpoints of the collective
+ * group specified by coll_handle. This is an asynchronous call that 
+ * will post a completion to the collective EVD when all endpoints 
+ * have synchronized. 
+ */
+DAT_RETURN
+dapli_collective_barrier(
+        IN DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN DAT_DTO_COOKIE		user_context,
+        IN DAT_COMPLETION_FLAGS		comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	int ret;
+	
+	if (DAPL_BAD_HANDLE(coll_handle, DAPL_MAGIC_COLL))
+		return(dapl_convert_errno(EINVAL, " barrier"));
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		" coll_barrer: grp_hndl=%p u_ctx=%llx flgs=%d\n",
+		coll_handle, user_context, comp_flags);
+
+	ret = fca_do_barrier(group->comm);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_barrier"));
+
+	/* setup and post successful barrier, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_BARRIER_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+};
+
+/* 
+ * This call performs a broadcast send operation that transfers
+ * data specified by the buffer argument of the root into the buffer argument 
+ * of all other endpoints in the collective group specified by coll_handle.  
+ * The operation is completed on the collective EVD unless completions are 
+ * suppressed through the completion flags.  All broadcasts are considered 
+ * o?=in placeo?= transfers.  The tables below show the result of a broadcast 
+ * operation. 
+ */
+DAT_RETURN
+dapli_collective_broadcast(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			buffer,
+ 	IN  DAT_COUNT			byte_count,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			user_context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	struct fca_bcast_spec bcast;
+	int ret;
+
+	if (DAPL_BAD_HANDLE(coll_handle, DAPL_MAGIC_COLL))
+		return(dapl_convert_errno(EINVAL, " fca_bcast"));
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " coll_bcast[%d]: group=%p buf=%p size=%d root=%d"
+		 " ctxt=%llx flgs=%d\n",
+		 group->self, coll_handle, buffer, byte_count, root,
+		 user_context, comp_flags );
+
+	/* Run FCA BCAST, if  */
+	bcast.root = root;
+	bcast.buf = buffer;
+	bcast.size = byte_count;
+
+	ret = fca_do_bcast(group->comm, &bcast);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_bcast"));
+
+	/* setup and post successful bcast, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_BROADCAST_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+};
+
+
+DAT_RETURN
+dapli_collective_reduce(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_IB_COLLECTIVE_RANK		root,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	fca_reduce_spec_t reduce;
+	int ret;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " coll_reduce[%d]: group=%p sbuf=%p slen=%d rbuf=%p rlen=%d"
+		 " root=%d op=%d type=%d ctxt=%llx cflgs=%d\n",
+		 group->self, coll_handle, snd_buf, snd_len,
+		 rcv_buf, rcv_len, root, op, type,
+		 user_context, comp_flags );
+
+	if (DAPL_BAD_HANDLE(coll_handle, DAPL_MAGIC_COLL))
+		return(dapl_convert_errno(EINVAL, " reduce"));
+
+	reduce.root = root;
+	reduce.sbuf = snd_buf;
+	reduce.rbuf = rcv_buf;
+	reduce.buf_size = snd_len; /* bytes */
+	reduce.dtype = fca_dtype(type);
+	reduce.length = snd_len/fca_dsize(type);  /* bytes to elements */
+	reduce.op = fca_op(op);
+
+	if (group->self == root && snd_buf == NULL) /* MPI_IN_PLACE */
+		reduce.sbuf = rcv_buf;
+
+	ret = fca_do_reduce(group->comm, &reduce);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_reduce"));
+
+	/* setup and post successful reduce, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_REDUCE_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+}
+
+DAT_RETURN
+dapli_collective_allreduce(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	fca_reduce_spec_t reduce;
+	int ret;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " coll_allreduce[%d]: group=%p sbuf=%p slen=%d,%d rbuf=%p rlen=%d"
+		 " op=%d type=%d ctxt=%llx cflgs=%d\n",
+		 group->self, coll_handle, snd_buf, snd_len,
+		 snd_len/fca_dsize(type),
+		 rcv_buf, rcv_len, op, type,
+		 user_context, comp_flags );
+
+	reduce.root = 0; /* ignored for allreduce */  
+	reduce.sbuf = snd_buf;
+	reduce.rbuf = rcv_buf;
+	reduce.buf_size = snd_len; /* bytes */
+	reduce.dtype = fca_dtype(type);
+	reduce.length = snd_len/fca_dsize(type);  /* bytes to elements */
+	reduce.op = fca_op(op);
+
+	if (snd_buf == NULL) /* MPI_IN_PLACE */
+		reduce.sbuf = rcv_buf;
+
+	ret = fca_do_all_reduce(group->comm, &reduce);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_allreduce"));
+
+	/* setup and post successful reduce, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_ALLREDUCE_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+}
+
+/*
+ * This call performs a scatter of the data specified by the
+ * send_buffer argument to the collective group specified by coll_handle.  
+ * Data is received in the buffer specified by the recv_buffer argument.  
+ * The recv_byte_count argument specifies the size of the receive buffer.  
+ * Data from the root send_buffer will be divided by the number of members 
+ * in the collective group to form equal and contiguous memory partitions.  
+ * Each member of the collective group will receive its rank relative 
+ * partition.  An error is returned if the send_byte_count does not describe 
+ * memory that can be evenly divided by the size of the collective group.  
+ * An o?=in placeo?= transfer for the root rank can be indicated by passing NULL 
+ * as the recv_buffer argument. The send_buffer and send_byte_count 
+ * arguments are ignored on non-root members. The operation is completed on 
+ * the collective EVD unless completions are suppressed through the 
+ * completion flags.
+ */
+DAT_RETURN
+dapli_collective_scatter(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+/*
+ * This call performs a non-uniform scatter of the data
+ * specified by the send_buffers array argument to the collective group 
+ * specified by coll_handle.  The send_buffers array contains one buffer 
+ * pointer for each member of the collective group, in rank order. 
+ * The send_byte_counts array contains a byte count for each corresponding 
+ * send buffer pointer.  The recv_buffer and recev_byte_count arguments 
+ * specify where received portions of the scatter are to be received.  
+ * An o?=in placeo?= transfer for the root rank can be indicated by passing 
+ * NULL as the recv_buffer argument. The send_buffers and send_byte_counts 
+ * arguments are ignored on non-root members.  The operation is completed 
+ * on the collective EVD unless completions are suppressed through the 
+ * completion flags.
+ */
+DAT_RETURN
+dapli_collective_scatterv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			*snd_bufs,
+ 	IN  DAT_COUNT			*snd_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+/* 
+ * This call performs a gather of the data sent by all
+ * members of the collective specified by the collective_handle argument.  
+ * The data to be sent is specified by the send_buffer and send_byte_count 
+ * arguments. Data is received by the collective member specified by the 
+ * root argument in the buffer specified by the recv_buffer and 
+ * recv_byte_count arguments.  Data is placed into the receive buffer in 
+ * collective rank order.  An o?=in placeo?= transfer for the root rank can 
+ * be indicated by passing NULL as the send_buffer argument.  
+ * The recv_buffer and recv_byte_count arguments are ignored on non-root 
+ * members.  The operation is completed on the collective EVD unless 
+ * completions are suppressed through the completion flags.  
+ */
+DAT_RETURN
+dapli_collective_gather(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags) 
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+/*
+ * This call performs a non-uniform gather of the data sent by
+ * all members of the collective specified by the collective_handle argument.  
+ * The data to be sent is specified by the send_buffer and send_byte_count 
+ * arguments.  Data is received by the collective member specified by the 
+ * root argument into the buffers specified by the recv_buffers and 
+ * recv_byte_counts array arguments.  Data is placed into the receive buffer 
+ * associated with the rank that sent it. An o?=in placeo?= transfer for the root 
+ * rank can be indicated by passing NULL as the send_buffer argument.  
+ * The recv_buffers and recv_byte_counts arguments are ignored on non-root 
+ * members.  The operation is completed on the collective EVD unless 
+ * completions are suppressed through the completion flags.  
+ */
+
+DAT_RETURN
+dapli_collective_gatherv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+/* 
+ * This call is equivalent to having all members of a collective
+ * group perform a dat_collective_gather() as the root.  This results in all 
+ * members of the collective having identical contents in their receive buffer
+ */
+DAT_RETURN
+dapli_collective_allgather(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_CONTEXT			user_context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	fca_gather_spec_t gather;
+	int ret;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " coll_allgather[%d]: group=%p sbuf=%p slen=%d rbuf=%p rlen=%d"
+		 " ctxt=%llx cflgs=%d\n",
+		 group->self, coll_handle, snd_buf, snd_len,
+		 rcv_buf, rcv_len, user_context, comp_flags );
+
+	gather.sbuf = snd_buf;
+	gather.size = snd_len;
+	gather.rbuf = rcv_buf;
+
+        if (snd_buf == NULL) /* MPI_IN_PLACE */
+                gather.sbuf = rcv_buf + rcv_len * group->self;
+
+	ret = fca_do_allgather(group->comm, &gather);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_allreduce"));
+
+	/* setup and post successful reduce, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_ALLGATHER_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+}
+
+/* 
+ * This call performs a non-uniform dat_collective_allgather()
+ * operation.  It is equivalent to having all members of a collective group 
+ * perform a dat_collective_gatherv() as the root.  This results in all 
+ * members of the collective having identical contents in their receive 
+ * buffer. 
+ */
+DAT_RETURN
+dapli_collective_allgatherv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_CONTEXT			user_context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	DAT_IB_EXTENSION_EVENT_DATA eventx;
+	struct coll_group *group = (struct coll_group *)coll_handle;
+	fca_gatherv_spec_t gatherv;
+	int ret;
+
+	dapl_log(DAPL_DBG_TYPE_EXTENSION,
+		 " coll_gather[%d]: group=%p sbuf=%p slen=%d rbufs=%p rlens=%p"
+		 " displs=%p ctxt=%llx cflgs=%d\n",
+		 group->self, coll_handle, snd_buf, snd_len,
+		 rcv_bufs, rcv_lens, displs, user_context, comp_flags );
+
+	gatherv.sbuf = snd_buf;
+	gatherv.sendsize = snd_len;
+	gatherv.rbuf = rcv_bufs;
+	gatherv.recvsizes = rcv_lens;
+	gatherv.displs = displs;
+
+	if (snd_buf == NULL) /* MPI_IN_PLACE */
+		gatherv.sbuf = rcv_bufs + displs[group->self];
+
+	ret = fca_do_allgatherv(group->comm, &gatherv);
+	if (ret < 0)
+		return(dapl_convert_errno(-ret, " fca_allreduce"));
+
+	/* setup and post successful reduce, make sync for now */
+	eventx.type = DAT_IB_COLLECTIVE_ALLGATHER_STATUS;
+	eventx.status = DAT_OP_SUCCESS;
+	eventx.coll.handle = group;
+	eventx.coll.context = user_context;
+	dapls_evd_post_event_ext(group->evd, DAT_IB_COLLECTIVE_EVENT, 0, (DAT_UINT64*)&eventx);
+
+	return DAT_SUCCESS;
+}
+
+DAT_RETURN
+dapli_collective_alltoall(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+DAT_RETURN
+dapli_collective_alltoallv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			*snd_bufs,
+ 	IN  DAT_COUNT			*snd_lens,
+ 	IN  DAT_COUNT			*snd_displs,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*rcv_displs,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;
+}
+
+DAT_RETURN
+dapli_collective_reduce_scatter(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				*rcv_lens,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;	
+}
+
+DAT_RETURN
+dapli_collective_scan(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags)
+{
+	return DAT_NOT_IMPLEMENTED;	
+}
+
+#endif /* DAT_FCA_PROVIDER */
+#endif /* DAT_IB_COLLECTIVES */
+
diff --git a/dapl/openib_common/collectives/fca_provider.h b/dapl/openib_common/collectives/fca_provider.h
new file mode 100755
index 0000000..0b819ae
--- /dev/null
+++ b/dapl/openib_common/collectives/fca_provider.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2011 Intel Corporation.  All rights reserved.
+ * 
+ * This Software is licensed under one of the following licenses:
+ * 
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ *    in the file LICENSE.txt in the root directory. The license is also
+ *    available from the Open Source Initiative, see 
+ *    http://www.opensource.org/licenses/cpl.php.
+ * 
+ * 2) under the terms of the "The BSD License" a copy of which is in the file
+ *    LICENSE2.txt in the root directory. The license is also available from
+ *    the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/bsd-license.php.
+ * 
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is in the file LICENSE3.txt in the root directory. The
+ *    license is also available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ * 
+ * Licensee has the right to choose one of the above licenses.
+ * 
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ * 
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+#ifndef _FCA_PROVIDER_H_
+#define _FCA_PROVIDER_H_
+
+#include <fca/fca_api.h>
+#include <fca/config/fca_parse_specfile.h>
+
+/* Collective Group Object */
+struct coll_group {
+
+    DAPL_HEADER			header;		/* collective type header, group lock */
+    DAT_CONTEXT			user_context;	/* user context for group */
+    struct dapl_llist_entry	list_entry;	/* group creation list for work thread */
+
+    /* Basic group information */
+    DAPL_EVD			*evd;	  /* DAT consumer evd for group, COLL type */
+    DAPL_PZ			*pz;	  /* DAT protection domain */
+    ib_hca_transport_t		*tp;      /* IA device transport object */
+    int				id;	  /* group id */
+    int				self;	  /* my rank index */
+    int				ranks;	  /* nprocs in group */
+    int				sock; 	  /* socket, needed to get grp comm_desc */
+    int				*conn;	  /* connections to exchange member info */
+    void			*op_pool; /* operations queue buffer pool */
+    struct dapl_llist_entry	*op_pend; /* in-process, in-order operations */
+    struct dapl_llist_entry	*op_free; /* free list for operations */
+    DAPL_OS_LOCK 		op_lock;  /* operations queue lock */
+    DAPL_OS_WAIT_OBJECT 	op_event; /* operations completion event */
+
+    /* provider specific information */
+    struct fca_context 		*ctx;       /* FCA device */
+    void 			*fca_info;  /* FCA member info, element = tp->f_size */
+    struct sockaddr_in		*addr_info; /* RANK address array, element = DAT_SOCK_ADDR */
+    fca_comm_caps_t 		comm_caps;  /* FCA comm group capabilities */
+    struct fca_rank_comm	*comm;      /* FCA comm group initialized */
+    struct fca_comm_new_spec	comm_new;   /* comm new spec */
+    int				comm_id;    /* FCA comm group id */
+    fca_comm_desc_t 		comm_desc;  /* FCA comm group */
+    fca_comm_init_spec_t 	comm_init;  /* FCA comm init parameters */
+    DAT_IB_COLLECTIVE_GROUP	g_info;	    /* Process layout info */
+};
+
+/* Collective Operation Object, for non-blocking support */
+#define COLL_OP_CNT	32
+
+struct coll_op {
+    struct dapl_llist_entry		list_entry;
+    struct coll_group			*grp;
+    DAT_IA_HANDLE 			ia;
+    enum dat_ib_op			op;
+    DAT_IB_COLLECTIVE_RANK		root;
+    DAT_IB_COLLECTIVE_RANK 		self;
+    DAT_CONTEXT				ctx;
+    DAT_COMPLETION_FLAGS		cflgs;
+    DAT_PVOID				sbuf;
+    DAT_COUNT   			ssize;
+    DAT_COUNT				*ssizes;
+    DAT_COUNT				*sdispls;
+    DAT_PVOID				rbuf;
+    DAT_COUNT				rsize;
+    DAT_COUNT				*rsizes;
+    DAT_COUNT				*rdispls;
+    DAT_IB_COLLECTIVE_REDUCE_DATA_OP 	reduce_op;
+    DAT_IB_COLLECTIVE_DATA_TYPE 	reduce_type;
+    DAT_UINT64				clock;
+    void 				*progress_func;
+    DAT_COUNT 				*member_size;
+    DAT_IB_COLLECTIVE_MEMBER 		*member;
+    DAT_RETURN				status;
+};
+
+#endif /* _FCA_PROVIDER_H_ */
diff --git a/dapl/openib_common/collectives/ib_collectives.h b/dapl/openib_common/collectives/ib_collectives.h
new file mode 100755
index 0000000..af644ed
--- /dev/null
+++ b/dapl/openib_common/collectives/ib_collectives.h
@@ -0,0 +1,228 @@
+/*
+ * Copyright (c) 2011 Intel Corporation.  All rights reserved.
+ * 
+ * This Software is licensed under one of the following licenses:
+ * 
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ *    in the file LICENSE.txt in the root directory. The license is also
+ *    available from the Open Source Initiative, see 
+ *    http://www.opensource.org/licenses/cpl.php.
+ * 
+ * 2) under the terms of the "The BSD License" a copy of which is in the file
+ *    LICENSE2.txt in the root directory. The license is also available from
+ *    the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/bsd-license.php.
+ * 
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is in the file LICENSE3.txt in the root directory. The
+ *    license is also available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ * 
+ * Licensee has the right to choose one of the above licenses.
+ * 
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ * 
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+#ifndef _IB_COLLECTIVES_H_
+#define _IB_COLLECTIVES_H_
+
+#ifdef DAT_IB_COLLECTIVES
+
+/* DAPL handle magic for collective */
+#define	DAPL_MAGIC_COLL	0xbabeface
+
+/* IB Collective Provider */
+#ifdef DAT_FCA_PROVIDER
+#include <collectives/fca_provider.h>
+#endif 
+
+/* IB Collective Provider Prototypes */
+int  dapli_create_collective_service(IN struct dapl_hca *hca);
+void dapli_free_collective_service(IN struct dapl_hca *hca);
+
+DAT_RETURN
+dapli_create_collective_member(
+	IN  DAT_IA_HANDLE		ia_handle,
+	IN  void			*progress_func,
+	OUT DAT_COUNT			*member_size,
+	OUT DAT_IB_COLLECTIVE_MEMBER	*member);
+
+DAT_RETURN
+dapli_free_collective_member(
+	IN  DAT_IA_HANDLE		ia_handle,
+	IN DAT_IB_COLLECTIVE_MEMBER	member);
+
+DAT_RETURN
+dapli_create_collective_group(
+	IN  DAT_EVD_HANDLE		evd_handle,
+	IN  DAT_PZ_HANDLE		pz,
+	IN  DAT_IB_COLLECTIVE_MEMBER	*members,
+	IN  DAT_COUNT			ranks,
+	IN  DAT_IB_COLLECTIVE_RANK	self,
+	IN  DAT_IB_COLLECTIVE_ID	id,
+	IN  DAT_IB_COLLECTIVE_GROUP	*g_info,
+	IN  DAT_DTO_COOKIE		user_ctx);
+
+DAT_RETURN
+dapli_free_collective_group(
+        IN DAT_IB_COLLECTIVE_HANDLE	coll_handle);
+
+DAT_RETURN
+dapli_collective_barrier(
+        IN DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN DAT_DTO_COOKIE		user_context,
+        IN DAT_COMPLETION_FLAGS		comp_flags);
+
+DAT_RETURN
+dapli_collective_broadcast(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			buffer,
+ 	IN  DAT_COUNT			byte_count,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			user_context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_reduce(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_IB_COLLECTIVE_RANK		root,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags);
+
+DAT_RETURN
+dapli_collective_allreduce(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags);
+
+DAT_RETURN
+dapli_collective_scatter(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_scatterv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			*snd_bufs,
+ 	IN  DAT_COUNT			*snd_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_gather(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_gatherv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_IB_COLLECTIVE_RANK	root,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_allgather(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_allgatherv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*displs,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_alltoall(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			snd_buf,
+ 	IN  DAT_COUNT			snd_len,
+	IN  DAT_PVOID			rcv_buf,
+ 	IN  DAT_COUNT			rcv_len,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_alltoallv(
+	IN  DAT_IB_COLLECTIVE_HANDLE	coll_handle,
+	IN  DAT_PVOID			*snd_bufs,
+ 	IN  DAT_COUNT			*snd_lens,
+ 	IN  DAT_COUNT			*snd_displs,
+	IN  DAT_PVOID			*rcv_bufs,
+ 	IN  DAT_COUNT			*rcv_lens,
+ 	IN  DAT_COUNT			*rcv_displs,
+	IN  DAT_CONTEXT			context,
+	IN  DAT_COMPLETION_FLAGS	comp_flags);
+
+DAT_RETURN
+dapli_collective_reduce_scatter(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				*rcv_lens,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags);
+
+DAT_RETURN
+dapli_collective_scan(
+	IN  DAT_IB_COLLECTIVE_HANDLE		coll_handle,
+	IN  DAT_PVOID				snd_buf,
+ 	IN  DAT_COUNT				snd_len,
+	IN  DAT_PVOID				rcv_buf,
+ 	IN  DAT_COUNT				rcv_len,
+	IN  DAT_IB_COLLECTIVE_REDUCE_DATA_OP	op,
+	IN  DAT_IB_COLLECTIVE_DATA_TYPE		type,
+	IN  DAT_CONTEXT				user_context,
+	IN  DAT_COMPLETION_FLAGS		comp_flags);
+
+#endif /* DAT_IB_COLLECTIVES */
+#endif /* _IB_COLLECTIVES_H_ */
-- 
1.7.3






More information about the ewg mailing list