Operações de Redução em MPI¶
Nesta aula, exploraremos o conceito de redução distribuída de dados em ambientes de computação paralela utilizando a biblioteca MPI (Message Passing Interface). Nosso objetivo é compreender como distribuir tarefas entre múltiplos processos, coletar os resultados parciais e, por fim, consolidar esses resultados de forma eficiente.
Reduzir é um conceito clássico da programação funcional. A redução de dados envolve a redução de um conjunto de números em um conjunto menor de números por meio de uma função. Por exemplo, digamos que temos uma lista de números [1, 2, 3, 4, 5]. Reduzir esta lista de números com a função sum produziria sum([1, 2, 3, 4, 5]) = 15. Da mesma forma, a redução da multiplicação resultaria em multiplicar([1, 2, 3, 4, 5]) = 120.
Como você deve ter imaginado, pode ser muito complicado aplicar funções de redução em um conjunto de números distribuídos. Junto com isso, é difícil programar de forma eficiente reduções não comutativas, ou seja, reduções que devem ocorrer em uma ordem definida. Felizmente, o MPI tem uma função útil chamada MPI_Reduce que irá lidar com quase todas as reduções comuns que um programador precisa fazer em um aplicativo paralelo.
Aproximação de PI com Comunicação Ponto-a-Ponto¶
Neste exemplo, cada processo MPI (exceto o coordenador) realiza uma simulação de Monte Carlo para estimar o valor de π. Cada processo gera coordenadas aleatórias (x, y) dentro do quadrado unitário e verifica se o ponto está dentro do quarto de círculo. O número de acertos é enviado ao processo coordenador por meio de MPI_Send
, que os coleta com MPI_Recv
e calcula π pela fórmula 𝜋 ≈ 4 * (pontos dentro do círculo / total de pontos).
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cmath>
#include <mpi.h>
#define SEED_MPI 35791246
int main(int argc, char* argv[]) {
const long niter = 10000000;
int myid, nodenum, count = 0;
double x, y, z, pi;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &nodenum);
std::vector<int> received(nodenum, 0);
std::vector<long> recvniter(nodenum, 0);
std::srand(SEED_MPI + myid);
if (myid != 0) {
for (long i = 0; i < niter; ++i) {
x = static_cast<double>(std::rand()) / RAND_MAX;
y = static_cast<double>(std::rand()) / RAND_MAX;
z = std::sqrt(x * x + y * y);
if (z <= 1.0) count++;
}
for (int i = 0; i < nodenum; ++i) {
MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
MPI_Send(&niter, 1, MPI_LONG, 0, 2, MPI_COMM_WORLD);
}
} else {
for (int i = 1; i < nodenum; ++i) {
MPI_Recv(&received[i], 1, MPI_INT, i, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&recvniter[i], 1, MPI_LONG, i, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
int finalcount = 0;
long finalniter = 0;
for (int i = 1; i < nodenum; ++i) {
finalcount += received[i];
finalniter += recvniter[i];
}
pi = static_cast<double>(finalcount) / finalniter * 4.0;
std::cout << "Pi: " << pi << std::endl;
}
MPI_Finalize();
return 0;
}
pi_send_receive.cpp
distribui o cálculo da aproximação de π entre múltiplos processos MPI. Cada processo-executor realiza um número fixo de iterações e envia os resultados para o processo rank 0 usando MPI_Send
. O rank 0 coleta esses dados com MPI_Recv
e realiza a agregação final.
Use mpic++
para compilar seus programas:
mpic++ pi_send_receive.cpp -o pi_send_receive
Script de Submissão SLURM
#!/bin/bash
#SBATCH --job-name=mpi_pi
#SBATCH --output=output_pi.txt
#SBATCH --ntasks=16
#SBATCH --time=00:05:00
#SBATCH --partition=normal
mpirun -np $SLURM_NTASKS ./pi_send_receive
Submeta com:
sbatch job_pi.sh
Agregação com MPI_Reduce¶
Neste segundo exemplo, cada processo gera números aleatórios e calcula a soma local. Em vez de enviar esses valores individualmente ao rank 0, todos os processos participam de uma operação de redução coletiva com MPI_Reduce
. A operação soma (MPI_SUM
) os valores locais e entrega o total apenas ao rank 0, que então calcula a média global.
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <mpi.h>
std::vector<float> create_rand_nums(int num_elements) {
std::vector<float> rand_nums(num_elements);
for (int i = 0; i < num_elements; ++i) {
rand_nums[i] = static_cast<float>(std::rand()) / RAND_MAX;
}
return rand_nums;
}
int main(int argc, char** argv) {
if (argc != 2) {
std::cerr << "Usage: avg num_elements_per_proc" << std::endl;
return 1;
}
int num_elements_per_proc = std::atoi(argv[1]);
MPI_Init(nullptr, nullptr);
int world_rank, world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
std::srand(static_cast<unsigned int>(std::time(nullptr)) * (world_rank + 1));
std::vector<float> rand_nums = create_rand_nums(num_elements_per_proc);
float local_sum = 0.0f;
for (float val : rand_nums) {
local_sum += val;
}
std::cout << "Local sum for process " << world_rank
<< " = " << local_sum
<< ", avg = " << (local_sum / num_elements_per_proc) << std::endl;
float global_sum = 0.0f;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
if (world_rank == 0) {
float global_avg = global_sum / (world_size * num_elements_per_proc);
std::cout << "Total sum = " << global_sum
<< ", global avg = " << global_avg << std::endl;
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
O que são primitivas coletivas?¶
Primitivas coletivas são funções da biblioteca MPI que envolvem todos os processos de um grupo (geralmente MPI_COMM_WORLD
) de forma coordenada. Ao contrário das comunicações ponto-a-ponto, em que processos individuais trocam mensagens diretamente entre si (MPI_Send
e MPI_Recv
), as primitivas coletivas realizam operações em grupo — como distribuição de dados, coleta de resultados ou sincronização.
Entre as principais primitivas coletivas, temos:
MPI_Bcast
: envio de dados de um processo para todos os outrosMPI_Gather
/MPI_Scatter
: coleta ou distribuição de dados entre processosMPI_Reduce
/MPI_Allreduce
: agregação (redução) de valores de todos os processosMPI_Barrier
: sincronização de todos os processos
Essas funções são otimizadas internamente para oferecer alto desempenho em ambientes paralelos e facilitam a implementação de algoritmos distribuídos.
A função MPI_Reduce
permite que cada processo envie seus dados diretamente para uma operação de redução (como soma, mínimo, máximo) que é executada automaticamente no processo raiz. Isso elimina a necessidade de MPI_Send
e MPI_Recv
manuais, tornando o código mais limpo e eficiente.
Este proximo exemplo estende o uso das primitivas coletivas para um cenário em que todos os processos precisam conhecer o resultado global. Utilizamos MPI_Allreduce
para calcular a média global e, em seguida, MPI_Reduce
para somar os desvios quadráticos ao redor da média. O desvio padrão é calculado no rank0 a partir desses valores.
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cmath>
#include <ctime>
#include <cassert>
#include <mpi.h>
std::vector<float> create_rand_nums(int num_elements) {
std::vector<float> rand_nums(num_elements);
for (int i = 0; i < num_elements; ++i) {
rand_nums[i] = static_cast<float>(std::rand()) / RAND_MAX;
}
return rand_nums;
}
int main(int argc, char** argv) {
if (argc != 2) {
std::cerr << "Usage: avg num_elements_per_proc" << std::endl;
return 1;
}
int num_elements_per_proc = std::atoi(argv[1]);
MPI_Init(nullptr, nullptr);
int world_rank, world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
std::srand(static_cast<unsigned int>(std::time(nullptr)) * (world_rank + 1));
std::vector<float> rand_nums = create_rand_nums(num_elements_per_proc);
float local_sum = 0.0f;
for (float num : rand_nums)
local_sum += num;
float global_sum = 0.0f;
MPI_Allreduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);
float mean = global_sum / (num_elements_per_proc * world_size);
float local_sq_diff = 0.0f;
for (float num : rand_nums)
local_sq_diff += (num - mean) * (num - mean);
float global_sq_diff = 0.0f;
MPI_Reduce(&local_sq_diff, &global_sq_diff, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
if (world_rank == 0) {
float stddev = std::sqrt(global_sq_diff / (num_elements_per_proc * world_size));
std::cout << "Mean = " << mean << ", Standard deviation = " << stddev << std::endl;
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
Enquanto MPI_Reduce
entrega o resultado da operação ao processo rank0, MPI_Allreduce
entrega o mesmo resultado a todos os processos. Isso é útil quando todos os nós precisam do valor reduzido, como ao calcular desvios padrão, médias globais e sincronizações de estado.
O código para cálculo do desvio padrão utiliza duas reduções: uma com MPI_Allreduce
para obter a média global e outra com MPI_Reduce
para calcular a soma dos desvios quadráticos.
Sua vez!¶
1- Refatore o código do primeiro exemplo para utilizar MPI_Reduce
ao invés de MPI_Send
e MPI_Recv
.
2 - Cálculo Distribuído da Média de um Array
Objetivo: Usar MPI_Scatter para distribuir partes de um array entre os processos, calcular a média local em cada processo e, em seguida, usar MPI_Gather para coletar as médias locais no processo raiz para calcular a média global.
3- Distribuição de Configuração para Cálculos Paralelos
Objetivo: Utilizar MPI_Bcast para enviar uma configuração inicial (por exemplo, o número de iterações para um cálculo) do processo raiz para todos os outros processos.
4- Normalização de um Array em Paralelo
Objetivo: Normalizar um array grande em paralelo. O processo raiz calcula o valor máximo do array e o transmite para todos os outros processos usando MPI_Bcast. Cada processo então usa MPI_Scatter para receber seu segmento do array, normaliza-o, e os resultados são reunidos de volta usando MPI_Gather.
5- Cálculo Distribuído de Desvio Padrão
Objetivo: Calcular o desvio padrão de um conjunto de dados distribuído. O processo raiz distribui o conjunto de dados usando MPI_Scatter, cada processo calcula a média e a variação de sua parte do conjunto, e os resultados são coletados com MPI_Gather para calcular o desvio padrão global.
A entrega da atividade está para as 23h59 de 16/05