/* Proof of principle implementation for gfortran's libcaf_mpi. Consists of the "library" and an example program, roughly matching program main integer :: myvar[*], local_var myvar = this_image()*8 print *, "Image ", this_image(),": myvar = ", myvar sync all local_var = myvar [ mod (this_image(), num_images()) + 1 ] print *, "Image ", this_image(),": local_var = ", local_var end program main Warning: No attempt to handle errors or to write reliable/fast code has been made. See http://gcc.gnu.org/wiki/CoarrayLib for an overview about the interface which the libray should provide (incomplete, partially not up to date) See the code (linked there) for the current stub implementation (needs some fixes) And see http://gcc.gnu.org/ml/fortran/2010-04/msg00168.html for "a draft for some primitives" The front end currently handles: - SYNC ALL/IMAGES - ERROR STOP - _gfortran_caf_register (for nonallocatable coarrays) - _gfortran_caf_init - _gfortran_caf_finalize (And the cobound intrinsics, which do not require library calls.) Adding to the front-end a a call to "XYZ_Start_Read_Scalar" should be relatively simple; though, implementing all those transfer calls properly, might take some time. */ #include #include #include #include static int caf_this_image; static int caf_num_images; typedef enum caf_tags_t { CAF_TAG_SYNC, CAF_TAG_PULL_SCALAR, CAF_TAG_PULL_SCALAR_REP, CAF_ERROR_STOP } caf_tags_t; typedef enum caf_register_t { CAF_REGTYPE_COARRAY, /* Startup. */ CAF_REGTYPE_COARRAY_ALLOC, /* ALLOCATE statement. */ CAF_REGTYPE_LOCK, CAF_REGTYPE_LOCK_COMP } caf_register_t; typedef struct { void *addr; size_t size; } scalar_info; void main_loop (int new_syncs, int comm, int check_only, void *addr, size_t size) { int flag; MPI_Status status; MPI_Request rq; static int sync_cnt = 0; sync_cnt += new_syncs; for ( ; ; ) { if (sync_cnt == 0 && !check_only && !comm) return; if (check_only) { MPI_Iprobe (MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status); if (!flag) return; /* Nothing to do. */ } else MPI_Probe (MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); switch (status.MPI_TAG) { case CAF_TAG_SYNC: sync_cnt--; MPI_Recv (NULL, 0, MPI_BYTE, status.MPI_SOURCE, CAF_TAG_SYNC, MPI_COMM_WORLD, MPI_STATUS_IGNORE); break; case CAF_TAG_PULL_SCALAR: { scalar_info info; MPI_Recv (&info, sizeof(info), MPI_BYTE, status.MPI_SOURCE, CAF_TAG_PULL_SCALAR, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Ibsend (info.addr, info.size, MPI_BYTE, status.MPI_SOURCE, CAF_TAG_PULL_SCALAR_REP, MPI_COMM_WORLD, &rq); } break;; case CAF_TAG_PULL_SCALAR_REP: MPI_Recv (addr, size, MPI_BYTE, status.MPI_SOURCE, CAF_TAG_PULL_SCALAR_REP, MPI_COMM_WORLD, MPI_STATUS_IGNORE); comm = 0; check_only = 1; break; case CAF_ERROR_STOP: exit (1); /* ... room for improvement ... */ break; } } } void sync_all_1 () { int i; MPI_Request req; for (i = 1; i <= caf_num_images; i++) { if (i == caf_this_image) continue; MPI_Ibsend (NULL, 0, MPI_BYTE, i-1, CAF_TAG_SYNC, MPI_COMM_WORLD, &req); } main_loop (caf_num_images-1, 0, 0, NULL, 0); } void sync_all () { sync_all_1 (); } void sync_images (int n, int *image) { int i, nsyncs; if (n < 0) /* SYNC IMAGES(*). */ { sync_all_1 (); return; } MPI_Request req; nsyncs = 0; for (i = 0; i < n; i++) { if (i == image[i]) continue; nsyncs++; MPI_Ibsend (NULL, 0, MPI_BYTE, image[i]-1, CAF_TAG_SYNC, MPI_COMM_WORLD, &req); } main_loop (nsyncs, 0, 0, NULL, 0); } void * register_ (size_t size, size_t *token[], caf_register_t type) { void *result; result = malloc (size); *token = malloc (sizeof (size_t)*caf_num_images); /* No need to go into the message loop for initial startup. */ if (type != CAF_REGTYPE_COARRAY && type != CAF_REGTYPE_LOCK) sync_all_1 (); MPI_Allgather (&result, sizeof (size_t), MPI_BYTE, token[0], sizeof (size_t), MPI_BYTE, MPI_COMM_WORLD); return result; } void XYZ_Start_Read_Scalar (void *address_to, size_t size_to, int image_from, size_t token_from[], ptrdiff_t offset_from, int blocking) { MPI_Request req; if (image_from == caf_this_image) { memmove (address_to, (void *)token_from[caf_this_image-1], size_to); main_loop (0, 0, 1, NULL, 0); /* Check queue. */ return; } scalar_info info; info.addr = ((void *)token_from[image_from-1]) + offset_from; info.size = size_to; MPI_Ibsend (&info, sizeof (info), MPI_BYTE, image_from-1, CAF_TAG_PULL_SCALAR, MPI_COMM_WORLD, &req); main_loop (0, 1, 0, address_to, size_to); } int main () { int *myvar; size_t *myvar_token; int local_var; MPI_Init (NULL, NULL); MPI_Comm_rank (MPI_COMM_WORLD, &caf_this_image); caf_this_image++; MPI_Comm_size (MPI_COMM_WORLD, &caf_num_images); myvar = (int *) register_ (sizeof (int), &myvar_token, CAF_REGTYPE_COARRAY); *myvar = 8*caf_this_image; printf ("Image %d: myvar = %d\n", caf_this_image, *myvar); sync_all (); XYZ_Start_Read_Scalar (&local_var, sizeof(local_var), caf_this_image % caf_num_images + 1, myvar_token, 0, 0); printf ("Image %d: local_var = %d\n", caf_this_image, local_var); /* Finalize. FIXME: Need to deallocate tokens/coarrays. */ sync_all (); MPI_Finalize (); return 0; }