Tutorial 3 - Scattering and Gathering data

Scattering and gathering numpy arrays

As well as broadcasting and transferring objects, we may wish to split data up for analysis. This is done using the scatter_array and gather functions.

In this script, we look at two ways of scattering data and then how to gather the data back up for consolidation:

#!/usr/bin/python3

import sys

import numpy as np

from anamnesis import MPIHandler

# All nodes must perform this
m = MPIHandler(use_mpi=True)

# We need at least three nodes for this
if m.size < 3:
    print("Error: This example needs at least three MPI nodes")
    m.done()
    sys.exit(1)

# Create a matrix of data for scattering
# Pretend that we have 300 points of data which we want to scatter,
# each of which is a vector of dimension 20

# This creates a matrix containing 0-19 in row 1,
# 100-119 in row 2, etc
data_dim = 20
num_pts = 300

if m.rank == 0:
    # We are the master node
    print("Master node")

    data = np.tile(np.arange((num_pts)) * 100, (data_dim, 1)).T + np.arange(data_dim)[None, :]

    print("Master node: Full data array: ({}, {})".format(*data.shape))

    # 1. Scatter using scatter_array
    m1_data = m.scatter_array(data)

    print("Master node M1: m1_data shape: ({}, {})".format(*m1_data.shape))

    # 2. Scatter manually, using indices

    # Send the data to all nodes
    m.bcast(data)

    # Calculate which indices each node should work on and send them around
    scatter_indices = m.get_scatter_indices(data.shape[0])
    m.bcast(scatter_indices)

    indices = range(*scatter_indices[m.rank])

    m2_data = data[indices, :]

    print("Master node M2: m2_data shape: ({}, {})".format(*m2_data.shape))

    # 3. Gather using the gather function

    # Create some fake data to gather
    ret_data = (np.arange(m2_data.shape[0]) + m.rank * 100)[:, None]

    print("Master node: data to gather shape: ({}, {})".format(*ret_data.shape))
    print("Master node: first 10 elements: ", ret_data[0:10, 0])

    all_ret_data = m.gather(ret_data)

    print("Master node: gathered data shape: ({}, {})".format(*all_ret_data.shape))

    print("all_ret_data 0:10: ", all_ret_data[0:10, 0])
    print("all_ret_data 100:110: ", all_ret_data[100:110, 0])
    print("all_ret_data 200:210: ", all_ret_data[200:210, 0])

else:
    # We are a slave node
    print("Slave node {}".format(m.rank))

    # 1. Scatter using scatter_array
    m1_data = m.scatter_array(None)

    print("Slave node {} M1: data shape: ({}, {})".format(m.rank, *m1_data.shape))

    # 2. Scatter manually, using indices
    # Recieve the full dataset
    data = m.bcast()

    # Get our indices
    scatter_indices = m.bcast()

    # Extract our data to work on
    indices = range(*scatter_indices[m.rank])

    m2_data = data[indices, :]

    print("Slave node {} M2: data shape: ({}, {})".format(m.rank, *m2_data.shape))

    # 3. Gather using the gather function

    # Create some fake data to gather
    ret_data = (np.arange(m2_data.shape[0]) + m.rank * 100)[:, None]

    print("Slave node {}: data to gather shape: ({}, {})".format(m.rank, *ret_data.shape))
    print("Slave node {}: first 10 elements: ".format(m.rank), ret_data[0:10, 0])

    m.gather(ret_data)

# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()

Scattering Method 1: scatter_array

The simplest way to scatter data is to use the scatter_array function. This function always operates on the first dimension. I.e., if you have three nodes and a dataset of size (100, 15, 23), the first node will receive data of size (34, 15, 23) and the remaining two nodes (33, 15, 23).

The code will automatically split the array unequally if necessary.

Scattering Method 2: indices

It is sometimes more useful to broadcast an entire dataset to all nodes using bcast and then have the nodes split the data up themselves (for instance, if they need all of the data for part of the computation but should only work on some of the data for the full computation).

To do this, we can use the get_scatter_indices function. This must be called with the size of the data which we are “scattering”. In the example in the text above, we would call this function with the argument 100. The function then returns a list containing a set of arguments to pass to the range function. In the example above, this would be:

[(0, 34), (34, 67), (67, 100)]

There is an entry in the list for each MPI node. We broadcast this list to all MPI nodes which are then responsible for extracting just the part of the data required, for example (assuming that m is our MPIHandler):

all_indices = m.bcast(None)

my_indices = range(*all_indices[m.rank])

Note that these indices are congruent with the indices used during gather, so you can safely gather data which has been manually collated in this way.

Gathering numpy arrays

Gathering arrays is straightforward. Use the gather function, passing the partial array from each node. There is an example of this in test_script3a.py above. (Note that by default, the data is gathered to the root node).

Scattering and gathering lists

Scattering and gathering lists is similar to the process for arrays. There are two differences. The first is that you need to use the scatter_list and gather_list routines. The second is that the gather_list routine needs to be told the total length of the combined list, and on nodes where you want to receive the full list (including the master), you must pass return_all as True (the default is False).

An example script can be seen below:

#!/usr/bin/python3

import sys

from anamnesis import MPIHandler

# All nodes must perform this
m = MPIHandler(use_mpi=True)

# We need at least three nodes for this
if m.size < 3:
    print("Error: This example needs at least three MPI nodes")
    m.done()
    sys.exit(1)

# Create a list of data for scattering
# Pretend that we have 300 points of data which we want to scatter
num_pts = 300

if m.rank == 0:
    # We are the master node
    print("Master node")

    data = [str(x) for x in range(num_pts)]

    print("Master node: Full data array: len: {}".format(len(data)))

    # Scatter using scatter_list
    m1_data = m.scatter_list(data)

    print("Master node M1: m1_data len: {}".format(len(m1_data)))

    # Gather list back together again
    all_ret_data = m.gather_list(m1_data, num_pts, return_all=True)

    print("Master node: gathered list len: {}".format(len(all_ret_data)))

    print("all_ret_data 0:10: ", all_ret_data[0:10])
    print("all_ret_data 100:110: ", all_ret_data[100:110])
    print("all_ret_data 200:210: ", all_ret_data[200:210])

else:
    # We are a slave node
    print("Slave node {}".format(m.rank))

    # Scatter using scatter_list
    m1_data = m.scatter_list(None)

    print("Slave node {}: data len: {}".format(m.rank, len(m1_data)))

    # Gather using the gather_list function
    m.gather_list(m1_data, num_pts)

# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()