#define __PIO_FILE__ "pio_spmd_utils.F90" !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ! ! Module pio_spmd_utils ! ! Point-to-point implementations of ! MPI collectives, for improved performance ! and/or robustness on certain platforms ! ! ! 20090508 Initial version (based on spmd_utils in CAM) - P. Worley ! ! Code added as a work around for poor rsend performance on cray systems with ! Gemini interconnect ! #ifdef _NO_MPI_RSEND #define MPI_RSEND MPI_SEND #define mpi_rsend mpi_send #define MPI_IRSEND MPI_ISEND #define mpi_irsend mpi_isend #endif module pio_spmd_utils use pio_kinds use pio_support, only: CheckMPIReturn #ifndef NO_MPIMOD use mpi ! _EXTERNAL #endif implicit none private #ifdef NO_MPIMOD include 'mpif.h' ! _EXTERNAL #endif public :: pio_swapm interface pio_swapm ! TYPE int,real,double,long module procedure pio_swapm_{TYPE} end interface character(len=*), parameter :: modName='pio_spmd_utils' contains !======================================================================== ! integer function pair(np,p,k) integer np,p,k,q q = ieor(p,k) if(q.gt.np-1) then pair = -1 else pair = q endif return end function pair ! !======================================================================== ! integer function ceil2(n) integer n,p p=1 do while(p.lt.n) p=p*2 enddo ceil2=p return end function ceil2 ! !======================================================================== ! ! TYPE int,real,double,long subroutine pio_swapm_{TYPE} ( nprocs, mytask, & sndbuf, sbuf_siz, sndlths, sdispls, stypes, & rcvbuf, rbuf_siz, rcvlths, rdispls, rtypes, & comm, comm_hs, comm_isend, comm_maxreq ) !----------------------------------------------------------------------- ! !> Purpose: !! Reduced version of original swapm (for swap of multiple messages !! using MPI point-to-point routines), more efficiently implementing a !! subset of the swap protocols. !! !! Method: !! comm_protocol: !! comm_isend == .true.: use nonblocking send, else use blocking send !! comm_hs == .true.: use handshaking protocol !! comm_maxreq: !! =-1,0: do not limit number of outstanding send/receive requests !! >0: do not allow more than min(comm_maxreq, steps) outstanding !! nonblocking send requests or nonblocking receive requests !! !! Author of original version: P. Worley !! Ported from CAM: P. Worley, May 2009 !< !----------------------------------------------------------------------- !----------------------------------------------------------------------- implicit none !---------------------------Input arguments-------------------------- ! integer, intent(in) :: nprocs ! size of communicator integer, intent(in) :: mytask ! MPI task id with communicator integer, intent(in) :: sbuf_siz ! size of send buffer integer, intent(in) :: rbuf_siz ! size of receive buffer integer, intent(in) :: sndlths(0:nprocs-1)! length of outgoing message integer, intent(in) :: sdispls(0:nprocs-1)! offset from beginning of send ! buffer where outgoing messages ! should be sent from integer, intent(in) :: stypes(0:nprocs-1) ! MPI data types integer, intent(in) :: rcvlths(0:nprocs-1)! length of incoming messages integer, intent(in) :: rdispls(0:nprocs-1)! offset from beginning of receive ! buffer where incoming messages ! should be placed integer, intent(in) :: rtypes(0:nprocs-1) ! MPI data types {VTYPE}, intent(in) :: sndbuf(sbuf_siz) ! outgoing message buffer integer, intent(in) :: comm ! MPI communicator logical, intent(in) :: comm_hs ! handshaking protocol? logical, intent(in) :: comm_isend ! nonblocking send protocol? integer, intent(in) :: comm_maxreq ! maximum number of outstanding ! nonblocking requests !---------------------------Output arguments-------------------------- ! {VTYPE}, intent(out) :: rcvbuf(rbuf_siz) ! incoming message buffer #ifndef _MPISERIAL ! !---------------------------Local workspace------------------------------------------- ! character(len=*), parameter :: subName=modName//'::pio_swapm_{TYPE}' integer :: steps ! number of swaps to initiate integer :: swapids(nprocs) ! MPI process id of swap partners integer :: p ! process index integer :: istep ! loop index integer :: tag ! MPI message tag integer :: offset_t ! MPI message tag offset, for addressing ! message conflict bug (if necessary) integer :: offset_s ! index of message beginning in ! send buffer integer :: offset_r ! index of message beginning in ! receive buffer integer :: sndids(nprocs) ! send request ids integer :: rcvids(nprocs) ! receive request ids integer :: hs_rcvids(nprocs) ! handshake receive request ids integer :: maxreq, maxreqh ! maximum number of outstanding ! nonblocking requests (and half) integer :: hs ! handshake variable integer :: rstep ! "receive" step index logical :: handshake, sendd ! protocol option flags integer :: ier ! return error status integer :: status(MPI_STATUS_SIZE) ! MPI status ! !------------------------------------------------------------------------------------- ! #ifdef _NO_PIO_SWAPM_TAG_OFFSET offset_t = 0 #else offset_t = nprocs #endif ! ! if necessary, send to self if (sndlths(mytask) > 0) then tag = mytask + offset_t offset_r = rdispls(mytask)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(mytask), rtypes(mytask), & mytask, tag, comm, rcvids(1), ier ) call CheckMPIReturn(subName,ier) offset_s = sdispls(mytask)+1 call mpi_send( sndbuf(offset_s), sndlths(mytask), stypes(mytask), & mytask, tag, comm, ier ) call CheckMPIReturn(subName,ier) call mpi_wait( rcvids(1), status, ier ) call CheckMPIReturn(subName,ier) endif ! calculate swap partners and communication ordering steps = 0 do istep=1,ceil2(nprocs)-1 p = pair(nprocs,istep,mytask) if (p >= 0) then if (sndlths(p) > 0 .or. rcvlths(p) > 0) then steps = steps + 1 swapids(steps) = p end if end if end do if (steps .eq. 0) return ! identify communication protocol if (comm_isend) then sendd = .false. else sendd = .true. endif handshake = comm_hs ! identify maximum number of outstanding nonblocking requests to permit if (steps .eq. 1) then maxreq = 1 maxreqh = 1 else if (comm_maxreq >= -1) then maxreq = comm_maxreq else maxreq = steps endif if ((maxreq .le. steps) .and. (maxreq > 0)) then if (maxreq > 1) then maxreqh = maxreq/2 else maxreq = 2 maxreqh = 1 endif else maxreq = steps maxreqh = steps endif endif ! Four protocol options: ! (1) handshaking + blocking sends if ((handshake) .and. (sendd)) then ! Initialize hs variable hs = 1 ! Post initial handshake receive requests do istep=1,maxreq p = swapids(istep) if (sndlths(p) > 0) then tag = mytask + offset_t call mpi_irecv( hs, 1, MPI_INTEGER, p, tag, comm, & hs_rcvids(istep), ier ) call CheckMPIReturn(subName,ier) endif enddo ! Post initial receive requests do istep=1,maxreq p = swapids(istep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(istep), ier ) call CheckMPIReturn(subName,ier) call mpi_send ( hs, 1, MPI_INTEGER, p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif enddo rstep = maxreq ! Send (and start receiving) data do istep=1,steps p = swapids(istep) ! Submit new rsend request if (sndlths(p) > 0) then tag = mytask + offset_t offset_s = sdispls(p)+1 call mpi_wait ( hs_rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) call mpi_rsend ( sndbuf(offset_s), sndlths(p), stypes(p), & p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif if (istep > maxreqh) then ! Wait for oldest irecv request to complete p = swapids(istep-maxreqh) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif if (rstep < steps) then rstep = rstep + 1 p = swapids(rstep) ! Submit a new handshake irecv request if (sndlths(p) > 0) then tag = mytask + offset_t call mpi_irecv( hs, 1, MPI_INTEGER, p, tag, comm, & hs_rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) endif ! Submit a new irecv request if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) call mpi_send ( hs, 1, MPI_INTEGER, p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif endif endif ! enddo ! wait for rest of receive requests to complete do istep=steps-maxreqh+1,steps p = swapids(istep) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif enddo ! (2) handshaking + nonblocking sends elseif ((handshake) .and. (.not. sendd)) then ! Initialize hs variable hs = 1 ! Post initial handshake receive requests do istep=1,maxreq p = swapids(istep) if (sndlths(p) > 0) then tag = mytask + offset_t call mpi_irecv( hs, 1, MPI_INTEGER, p, tag, comm, & hs_rcvids(istep), ier ) call CheckMPIReturn(subName,ier) endif enddo ! Post initial receive requests do istep=1,maxreq p = swapids(istep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(istep), ier ) call CheckMPIReturn(subName,ier) call mpi_send ( hs, 1, MPI_INTEGER, p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif enddo rstep = maxreq ! Send (and start receiving) data do istep=1,steps p = swapids(istep) ! Submit new irsend request if (sndlths(p) > 0) then tag = mytask + offset_t offset_s = sdispls(p)+1 call mpi_wait ( hs_rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) call mpi_irsend( sndbuf(offset_s), sndlths(p), stypes(p), & p, tag, comm, sndids(istep), ier ) call CheckMPIReturn(subName,ier) endif if (istep > maxreqh) then ! Wait for oldest irecv request to complete p = swapids(istep-maxreqh) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif if (rstep < steps) then rstep = rstep + 1 p = swapids(rstep) ! Submit a new handshake irecv request if (sndlths(p) > 0) then tag = mytask + offset_t call mpi_irecv( hs, 1, MPI_INTEGER, p, tag, comm, & hs_rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) endif ! Submit a new irecv request if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) call mpi_send ( hs, 1, MPI_INTEGER, p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif endif ! Wait for outstanding i(r)send request to complete p = swapids(istep-maxreqh) if (sndlths(p) > 0) then call mpi_wait( sndids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif endif enddo ! wait for rest of send and receive requests to complete do istep=steps-maxreqh+1,steps p = swapids(istep) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif if (sndlths(p) > 0) then call mpi_wait( sndids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif enddo ! (3) no handshaking + blocking sends elseif ((.not. handshake) .and. (sendd)) then ! Post receive requests do istep=1,maxreq p = swapids(istep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(istep), ier ) call CheckMPIReturn(subName,ier) endif enddo rstep = maxreq ! Send (and start receiving) data do istep=1,steps p = swapids(istep) ! Submit new send request if (sndlths(p) > 0) then tag = mytask + offset_t offset_s = sdispls(p)+1 call mpi_send( sndbuf(offset_s), sndlths(p), stypes(p), & p, tag, comm, ier ) call CheckMPIReturn(subName,ier) endif if (istep > maxreqh) then ! Wait for oldest irecv request to complete p = swapids(istep-maxreqh) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif ! Submit a new irecv request if (rstep < steps) then rstep = rstep + 1 p = swapids(rstep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) endif endif endif enddo ! wait for rest of send and receive requests to complete do istep=steps-maxreqh+1,steps p = swapids(istep) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif enddo ! (4) no handshaking + nonblocking sends elseif ((.not. handshake) .and. (.not. sendd)) then ! Post receive requests do istep=1,maxreq p = swapids(istep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(istep), ier ) call CheckMPIReturn(subName,ier) endif enddo rstep = maxreq ! Send (and start receiving) data do istep=1,steps p = swapids(istep) ! Submit new isend request if (sndlths(p) > 0) then tag = mytask + offset_t offset_s = sdispls(p)+1 call mpi_isend( sndbuf(offset_s), sndlths(p), stypes(p), & p, tag, comm, sndids(istep), ier ) call CheckMPIReturn(subName,ier) endif if (istep > maxreqh) then ! Wait for oldest irecv request to complete p = swapids(istep-maxreqh) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif ! Submit a new irecv request if (rstep < steps) then rstep = rstep + 1 p = swapids(rstep) if (rcvlths(p) > 0) then tag = p + offset_t offset_r = rdispls(p)+1 call mpi_irecv( rcvbuf(offset_r), rcvlths(p), rtypes(p), & p, tag, comm, rcvids(rstep), ier ) call CheckMPIReturn(subName,ier) endif endif ! Wait for outstanding i(r)send request to complete p = swapids(istep-maxreqh) if (sndlths(p) > 0) then call mpi_wait( sndids(istep-maxreqh), status, ier ) call CheckMPIReturn(subName,ier) endif endif enddo ! wait for rest of send and receive requests to complete do istep=steps-maxreqh+1,steps p = swapids(istep) if (rcvlths(p) > 0) then call mpi_wait( rcvids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif if (sndlths(p) > 0) then call mpi_wait( sndids(istep), status, ier ) call CheckMPIReturn(subName,ier) endif enddo endif #endif return end subroutine pio_swapm_{TYPE} ! !======================================================================== ! end module pio_spmd_utils