mp_lu_decomposition.fpp Source File


Source Code

module mp_lu_decomposition

#ifdef ISO_C_BINDING

   implicit none

   public :: lu_decomposition_local
   public :: lu_inverse_local
   public :: lu_matrix_multiply_local

   interface lu_decomposition_local
!    module procedure lu_decomposition_local_real
      module procedure lu_decomposition_local_complex
   end interface

   interface lu_inverse_local
!    module procedure lu_inverse_local_real
      module procedure lu_inverse_local_complex
   end interface

   interface lu_matrix_multiply_local
!    module procedure lu_decomposition_local_real
      module procedure lu_matrix_multiply_local_complex
   end interface

contains

   subroutine lu_decomposition_local_complex(mp_comm, root, win, lu, idx, d)

      use mpi

      implicit none

      integer, intent(in) :: win, mp_comm, root
      complex, dimension(:, :), intent(in out) :: lu
      integer, dimension(:), intent(out) :: idx
      real, intent(out) :: d

      real, parameter :: zero = 1.0e-20
      real, dimension(size(lu, 1)) :: vv
      complex, dimension(size(lu, 2)) :: dum

      integer :: i, j, k, n, imax, lo, hi
      integer :: iproc, nproc, ierr
      real :: dmax, tmp

      n = size(lu, 1)

      call mpi_comm_size(mp_comm, nproc, ierr)
      call mpi_comm_rank(mp_comm, iproc, ierr)

      d = 1.0
      !the following is a loop to avoid copying entire matrix
      ! with (cabs(lu))
      do i = 1, n
         vv(i) = maxval(cabs(lu(i, :)))
      end do
      if (any(vv == 0.0)) &
         write (*, *) 'singular matrix in lu_decomposition on process ', iproc
      vv = 1.0 / vv
      do j = 1, n
         !divide up the work using row_limits
         call split_n_tasks(n - j, iproc, nproc, lo, hi, llim=j + 1)

         !pivot if needed
         dmax = -1.0
         do k = j, n
            tmp = vv(k) * abs(lu(k, j))
            if (tmp > dmax) then
               dmax = tmp
               imax = k
            end if
         end do

         if (iproc == root) then
            idx(j) = imax
            if (j /= imax) then
               dum = lu(imax, :)
               lu(imax, :) = lu(j, :)
               lu(j, :) = dum
               vv(imax) = vv(j)
               d = -d
            end if
            if (lu(j, j) == 0.0) lu(j, j) = zero
         else
            if (j /= imax) vv(imax) = vv(j)
         end if

         call mpi_win_fence(0, win, ierr)

         !get the lead multiplier
         do i = lo, hi
            lu(i, j) = lu(i, j) / lu(j, j)
         end do

         call mpi_win_fence(0, win, ierr)

         do k = lo, hi
            do i = j + 1, n
               lu(i, k) = lu(i, k) - lu(i, j) * lu(j, k)
            end do
         end do

         call mpi_win_fence(0, win, ierr)
      end do

   end subroutine lu_decomposition_local_complex

   subroutine lu_inverse_local_complex(mp_comm, win, lu, idx, inverse)

      use linear_solve, only: lu_back_substitution

      implicit none

      integer, intent(in) :: win, mp_comm
      complex, dimension(:, :), intent(in) :: lu
      integer, dimension(:), intent(in) :: idx
      complex, dimension(:, :), intent(out) :: inverse

      integer :: i, n, nproc, iproc, ierr
      integer :: lo, hi

      n = size(lu, 1)

      call mpi_comm_size(mp_comm, nproc, ierr)
      call mpi_comm_rank(mp_comm, iproc, ierr)

      call split_n_tasks(n, iproc, nproc, lo, hi)

      do i = lo, hi
         inverse(i, :) = 0
         inverse(i, i) = 1.0
      end do

      call mpi_win_fence(0, win, ierr)

      do i = lo, hi
         call lu_back_substitution(lu, idx, inverse(:, i))
      end do

      call mpi_win_fence(0, win, ierr)

   end subroutine lu_inverse_local_complex

   subroutine lu_matrix_multiply_local_complex(mp_comm, win, mat, b)

      implicit none

      integer, intent(in) :: win, mp_comm
      complex, dimension(:, :), intent(in) :: mat
      complex, dimension(:), intent(out) :: b
      complex, dimension(size(b)) :: a

      integer :: i, n, nproc, iproc
      integer :: lo, hi, ierr

      n = size(mat, 1)

      call mpi_comm_size(mp_comm, nproc, ierr)
      call mpi_comm_rank(mp_comm, iproc, ierr)

      call split_n_tasks(n, iproc, nproc, lo, hi)

      do i = lo, hi
         a(i) = sum(mat(i, :) * b(:))
      end do

      call mpi_win_fence(0, win, ierr)

      do i = lo, hi
         b(i) = a(i)
      end do

      call mpi_win_fence(0, win, ierr)

   end subroutine lu_matrix_multiply_local_complex

   subroutine split_n_tasks(n, iproc, nproc, lo, hi, llim, blocksize, comm)

      implicit none

      integer, intent(in) :: n, iproc, nproc
      integer, intent(out) :: lo, hi
      integer, optional, intent(in) :: llim
      integer, optional, intent(in) :: blocksize
      logical, optional, intent(out) :: comm

      integer :: n_div, n_mod, llim_l, blocksize_l

      llim_l = 1
      if (present(llim)) llim_l = llim

      blocksize_l = 1
      if (present(blocksize)) blocksize_l = blocksize

      n_div = n / nproc
      n_mod = mod(n, nproc)

      if (n_div < blocksize_l) then
         lo = min(iproc * blocksize_l + llim_l, n + llim_l)
         hi = min(lo + blocksize_l - 1, n + llim_l - 1)
         if (present(comm)) then ! do we require an MPI_WIN_FENCE?
            comm = .not. (blocksize_l >= n)
         end if
      else
         lo = iproc * n_div + min(iproc, n_mod) + llim_l
         hi = lo + n_div - 1
         if (iproc < n_mod) hi = hi + 1
         if (present(comm)) then ! do we require an MPI_WIN_FENCE?
            comm = .not. (n_div + min(n_mod, 1) >= n)
         end if
      end if

   end subroutine split_n_tasks

#endif

end module mp_lu_decomposition