Introduction to Parallel Programing in Python
This post will briefly introduce how to use
multiprocessing
module and mpi4py
module to
achieve parallel programming in Python.
Multiprocessing
Traditionally, Python is considered to not support parallel
programming very well, partly because of the global interpreter lock
(GIL). However, things have changed over time. This part will briefly
introduce the multiprocessing module in Python, which effectively
side-steps the GIL by using subprocesses instead of threads. The
multiprocessing module provides many useful features and is very
suitable for symmetric multiprocessing (SMP) and shared memory system.
In this post, we focus on the Pool
class of the
multiprocessing module, which controls a pool of worker processes and
supports both synchronous and asynchronous parallel execution
The Pool
class
Creating a Pool
object
A Pool
object can be created by passing the desired
number of processes to the constructor
1 | import multiprocessing as mp |
The map
method
To demonstrate the usage of the Pool class, let's define a simple function:
1 | def square(x): |
The above parallel code will print exactly the same result as the serial code, but the computations are actually distributed and executed in parallel on the worker processes
The starmap
method
The map
method is only applicable to computational
routines that accept a single argument. For routines that accept
multiple arguments, you can use the starmap
method
1 | def power_n(x, n): |
Note that both map
and starmap
are
synchronous methods. This may lead to performance degradation if the
workload is not well balanced among the worker processes
The apply_async
method
The Pool
class also provides the
apply_async
method that makes asynchronous execution of the
worker processes possible. The apply_async
method executes
the routine only once. Therefore, in the previous example. we would need
to define another routine, power_n_list
, that computes the
values of a list of numbers raised to a particular power
1 | def power_n_list (x_list, n): |
To use the apply_async
method, we also need to divide
the whole input list range(20)
into sub-lists (which are
known as slices) and distribute them to the worker processes. The slices
can be prepared by the following slice_data
method
1 | def slice_data(data, nprocs): |
Then we can pass the power_n_list
routine and the sliced
input list to the apply_async
method
1 | inp_lists = slice_data(range(20), nprocs) |
The actual result can be obtained using the get method and nested list comprehension
1 | result = [x for p in multi_result for x in p.get()] |
Example: Computing
The formula for computing
Serial code
For example, we can choose to use 10 million points
1 | nsteps = 10000000 |
Parallel code
To parallelize the serial code, we need to divide the
for
loop into sub-tasks and distribute them to the worker
processes. For example:
1 | def calc_partial_pi (rank, nprocs, nsteps, dx): |
With the calc_partial_pi
function we can prepare the
input arguments for the sub-tasks and compute the value of starmap
method:
1 | nprocs = mp.cpu_count() |
Or using the asynchronous parallel calculation with
apply_async
method:
1 | multi_result = [pool.apply_async(calc_partial_pi, inp) for inp in inputs] |
The Process
class
The Process
class makes it possible to have direct
control over individual processes. A process can be created by providing
a target function and its input arguments to the Process
constructor. The process can then be started with the start
method and ended using the join
method. For example:
1 | import multiprocessing as mp |
Process and Queue
In practice, we often want the process and the function to return the
computed result, rather than just printing the result as in the previous
example. This can be achieved with help from the Queue
class in the multiprocessing
module
The Queue
class includes the put
method for
depositing data and the get
method for retrieving data. The
code in the earlier example can be changed to:
1 | import multiprocessing as mp |
Or with multiple processes:
1 | import multiprocessing as mp |
From the result, we can see that the execution of the processes is asynchronous. To make sure that the order of the output will be the same as the input, we can use "process index" to sort them. For example:
1 | import multiprocessing as mp |
Example: computing again
Similarly:
1 | def calc_partial_pi(rank, nprocs, nsteps, dx, qout): |
mpi4py
The limitation of the multiprocessing
modules is that it
does not support parallelization over multiple compute nodes (i.e. on
distributed memory systems). To overcome this limitation and enable
cross-node parallelization, we can use MPI for python, that is, the
mpi4py
modules. This module provides an object-oriented
interface that resembles the message passing interface (MPI), and hence
allow Python programs to exploit multiple processors on multiple compute
nodes. The mpi4py
module supports both point-to-point and
collective communications for Python objects as well as buffer-like
objects.
Basic of mpi4py
Our first Python program with mpi4py
:
1 | from mpi4py import MPI |
In parallel programming with MPI, we need the so-called communicator, which is a group processes that can talk to each other. To identify the processes with that group, each process is assigned a rank that is unique within the communicator. The total number of processes is often referred to as the size of the communicator.
In the above code, we defined the variable comm
as the
default communicator MPI.COMM_WORLD
. The rank of each
process is retrieved via the Get_rank
method of the
communicator, and the size of the communicator is obtained by the
Get_size
method.
The usual way to execute an mpi4py
code in parallel is
to use mpirun
for example:
mpirun -n 4 python3 hello.py
will run the code on 4
processes.
Point-to-point communication
For point-to-point communication between Python objects,
mpi4py
provides the send
and recv
methods that are similar to those in MPI. For example:
1 | from mpi4py import MPI |
Note that the above example uses blocking communication methods,
which means that the execution of code will not proceed until the
communication is completed. To achieve non-blocking communication, use
isend
and irecv
. They immediately return
Request
objects, and we can use the wait
method to manage the completion of the communication:
1 | if rank == 0: |
Collective communication
In parallel programming, it is often useful to perform what is known
as collective communication, for example, broadcasting a Python object
from the master process to all the worker processes. The example code
below broadcasts a Numpy
array using the bcast
method.
1 | from mpi4py import MPI |
If one needs to divide a task and distribute the sub-tasks to the
processes, the scatter
method will be useful. Note that it
is not possible to distribute more elements than the number of
processors. If one has a big list or array, it is necessary to make
slices of the list or array before calling the scatter
method:
1 | from mpi4py import MPI |
The gather
method does the opposite to
scatter
. If each process has an element, one can use
gather
to collect them into a list of elements on the
master process. The code below use scatter
and
gather
to compute
1 | from mpi4py import MPI |
Buffer-like objects
The mpi4py
module provides methods for directly sending
and receiving buffer-like objects. The advantage of working with
buffer-like objects is that the communication is fast. However, the user
needs to be more explicit when it comes to handling the allocation of
memory space. For example, the memory of the receiving buffer needs to
be allocated prior to the communication, and the size of the sending
buffer should not exceed that of the receiving buffer. One should also
be aware that mpi4py
expects the buffer-like objects to
have contiguous memory.
In mpi4py
, a buffer-like object can be specified using a
list or tuple with 2 or 3 elements (or 4 elements for the vector
variants)
Point-to-point communication
Here is the example code corresponds to the previous one with buffer:
1 | from mpi4py import MPI |
Note that the data array was initialized on the worker processes
before the Recv
method was called. When using the
Send
and Recv
methods, one thing to keep in
mind is that the sending buffer and the receiving buffer should match in
size.
In mpi4py
, the non-blocking communication methods for
buffer-like objects are Isend
and Irecv
. The
use of non-blocking methods are shown in the example below:
1 | if rank == 0: |
Collective communication
1 | from mpi4py import MPI |
Another collective communication method is Scatter
,
which sends slices of a large array to the worker processes. However,
Scatter
is not as convenient as scatter
in
practice since it requires the size of the large array to be divisible
by the number of processes. In practice, the size of the array is not
known beforehand and is therefore not guaranteed to be divisible by the
number of available processes. If is more practical to use
Scatterv
, the vector version of Scatter
, which
offers a much more flexible way to distribute the array. For
example:
1 | from mpi4py import MPI |
Gatherv
is the reverse operation of
Scatterv
. When using Gatherv
, one needs to
specify the receiving buffer as
[recvbuf2, count, displ, MPI.DOUBLE]
:
1 | sendbuf2 = recvbuf |
Reduce
, on the other hand, can be used to compute the
sum of (or to perform another operation on) the data collected from all
processes. For example:
1 | partial_sum = np.zeros(1) |