VirtualFluids 0.2.0
Parallel CFD LBM Solver
Loading...
Searching...
No Matches
MPICommunicator.cpp
Go to the documentation of this file.
1//=======================================================================================
2// ____ ____ __ ______ __________ __ __ __ __
3// \ \ | | | | | _ \ |___ ___| | | | | / \ | |
4// \ \ | | | | | |_) | | | | | | | / \ | |
5// \ \ | | | | | _ / | | | | | | / /\ \ | |
6// \ \ | | | | | | \ \ | | | \__/ | / ____ \ | |____
7// \ \ | | |__| |__| \__\ |__| \________/ /__/ \__\ |_______|
8// \ \ | | ________________________________________________________________
9// \ \ | | | ______________________________________________________________|
10// \ \| | | | __ __ __ __ ______ _______
11// \ | | |_____ | | | | | | | | | _ \ / _____)
12// \ | | _____| | | | | | | | | | | \ \ \_______
13// \ | | | | |_____ | \_/ | | | | |_/ / _____ |
14// \ _____| |__| |________| \_______/ |__| |______/ (_______/
15//
16// This file is part of VirtualFluids. VirtualFluids is free software: you can
17// redistribute it and/or modify it under the terms of the GNU General Public
18// License as published by the Free Software Foundation, either version 3 of
19// the License, or (at your option) any later version.
20//
21// VirtualFluids is distributed in the hope that it will be useful, but WITHOUT
22// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
24// for more details.
25//
26// SPDX-License-Identifier: GPL-3.0-or-later
27// SPDX-FileCopyrightText: Copyright © VirtualFluids Project contributors, see AUTHORS.md in root folder
28//
32//=======================================================================================
33#include "metis.h"
34#if defined VF_MPI
35#if defined (_WIN32) || defined (_WIN64)
36 #include <Winsock2.h>
37#elif defined (__unix__)
38 #include <unistd.h>
39#endif
40
41#include "MPICommunicator.h"
42
43#include <mpi.h>
44
45#include <sstream>
46
47#include <logger/Logger.h>
48
49using namespace std;
50
51namespace vf::parallel
52{
55{
56 return MPI_Wtime();
57}
58std::shared_ptr<Communicator> MPICommunicator::getInstance()
59{
60 std::lock_guard<std::mutex> myLock(instantiation_mutex);
61 if (!instance) {
62 instance = std::shared_ptr<MPICommunicator>(new MPICommunicator);
63 }
64 return instance;
65}
67MPICommunicator::MPICommunicator()
68{
69 // proof if MPI is initialized
70 int mpiInitialized = 0; // false
72 if (mpiInitialized == 0) {
74 }
76 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
77 comm = MPI_COMM_WORLD;
78 root = 0;
79}
82{
83 // proof if MPI is finalized
84 int _mpiFinalized = 0; // false
86 if (_mpiFinalized == 0) {
88 }
89}
94{
95 vector<string> parts;
97 int scount;
100 MPI_Status status;
101
102 if (PID == root) {
103 rcounts.resize(numprocs - 1);
104 strings.push_back(str);
105
106 for (int i = 1; i < numprocs; i++) {
107 MPI_Recv(&rcounts[i - 1], 1, MPI_INT, i, 0, comm, &status);
108 }
109 for (int i = 1; i < numprocs; i++) {
110 rbuf.resize(rcounts[i - 1]);
111 MPI_Recv(&rbuf[0], rcounts[i - 1], MPI_CHAR, i, 0, comm, &status);
112 string s(&rbuf[0], rcounts[i - 1]);
113 if (s != "")
114 strings.push_back(s);
115 }
116 } else {
117 scount = (int)str.length();
118 MPI_Send(&scount, 1, MPI_INT, root, 0, comm);
119 MPI_Send((char *)str.c_str(), scount, MPI_CHAR, root, 0, comm);
120 }
121 return strings;
122}
130std::vector<unsigned long long> MPICommunicator::gather(std::vector<unsigned long long> &values)
131{
132 return gather<unsigned long long>(values);
133}
135int MPICommunicator::getProcessID() const { return PID; }
137int MPICommunicator::getProcessID(int /*bundle*/, int /*rank*/) const { return PID; }
139int MPICommunicator::getNumberOfProcesses() const { return numprocs; }
143int MPICommunicator::getBundleID() const { return 0; }
145int MPICommunicator::getNumberOfBundles() const { return 1; }
147int MPICommunicator::getRoot() const { return root; }
149int MPICommunicator::getBundleRoot() const { return 0; }
151int MPICommunicator::getProcessRoot() const { return 0; }
153int MPICommunicator::getNumberOfProcessesInBundle(int /*bundle*/) const { return numprocs; }
155bool MPICommunicator::isRoot() const { return PID == root; }
158{
159 string str = ss.str();
160 int scount = static_cast<int>(str.length());
161 MPI_Send(&scount, 1, MPI_INT, target, 0, comm);
162 MPI_Send((char *)str.c_str(), scount, MPI_CHAR, target, 0, comm);
163}
166{
168 int rcount;
169 MPI_Status status;
170 MPI_Recv(&rcount, 1, MPI_INT, source, 0, comm, &status);
171 rbuf.resize(rcount);
172 MPI_Recv(&rbuf[0], rcount, MPI_CHAR, source, 0, comm, &status);
173 ss.rdbuf()->pubsetbuf(&rbuf[0], rcount);
174 string str(&rbuf[0]);
175 ss.str(str);
176}
180void MPICommunicator::allGather(std::vector<int> &svalues, std::vector<int> &rvalues)
181{
183}
185void MPICommunicator::allGather(std::vector<float> &svalues, std::vector<float> &rvalues)
186{
188}
190void MPICommunicator::allGather(std::vector<double> &svalues, std::vector<double> &rvalues)
191{
193}
195void MPICommunicator::allGather(std::vector<unsigned long long> &svalues, std::vector<unsigned long long> &rvalues)
196{
198}
199void MPICommunicator::allGather(std::vector<uint>& svalues, std::vector<uint>& rvalues){ allGather<uint>(svalues, rvalues); }
205void MPICommunicator::broadcast(std::vector<int> &values) { broadcast<int>(values); }
207void MPICommunicator::broadcast(std::vector<float> &values) { broadcast<float>(values); }
209void MPICommunicator::broadcast(std::vector<double> &values) { broadcast<double>(values); }
211void MPICommunicator::broadcast(std::vector<long int> &values) { broadcast<long int>(values); }
213void MPICommunicator::broadcast(int &value) { broadcast<int>(value); }
215void MPICommunicator::broadcast(float &value) { broadcast<float>(value); }
217void MPICommunicator::broadcast(double &value) { broadcast<double>(value); }
219void MPICommunicator::broadcast(long int &value) { broadcast<long int>(value); }
220
223 int neighbor_rank_send) const
224{
227 &recv_request);
228 // printf("receive_send PID: %i, nbRev: nb_rank_recv: %i", this->getPID(), nb_rank_r);
229 // fflush(stdout);
231 // printf("receive_send PID: %i, sendUintGPU: nb_rank_send: %i", this->getPID(), nb_rank_s);
232 // fflush(stdout);
233 MPI_Wait(&recv_request, MPI_STATUSES_IGNORE); // TODO: Do we have a benefit here or could we simply do a blocking receiv.
234}
235
242
244{
246}
247
249{
251 double *buffer_recv = (double *)malloc(sizeof(double));
252
254
255 return *buffer_recv;
256}
257
258int MPICommunicator::mapCudaDevicesOnHosts(const std::vector<unsigned int> &devices, int numberOfDevices) const
259{
260 int device = -1;
261 char *host = (char *)malloc(sizeof(char) * getNumberOfProcesses() * 255);
262 unsigned int *map = (unsigned int *)malloc(sizeof(unsigned int) * getNumberOfProcesses());
263
264 char hostname[255];
265 gethostname(hostname, 254);
266 hostname[254] = 0;
267
269
270 int i, j;
271 if (isRoot()) {
272 for (i = 0; i < getNumberOfProcesses(); i++) {
273 int counter = 0;
274 for (j = 0; j < i; j++) {
275 if (strcmp(&host[i * 255], &host[j * 255]) == 0)
276 counter++;
277 }
278 if (counter >= numberOfDevices) {
279 VF_LOG_CRITICAL("More processes than GPUs!");
280 exit(1);
281 }
282 map[i] = devices[counter];
283 }
284 }
285
287
288 VF_LOG_INFO("Rank: {} runs on host: {} with GPU: {}", getProcessID(), hostname, device);
289
290 free(map);
291 free(host);
292 return device;
293}
294
296{
297 // printf("\n Start Recv Rank: %d, neighbor Rank: %d, request = %d \n", PID, nb_rank, (int)requestGPU.size());
298 // fflush(stdout);
299
302 requests.push_back(request);
303
304 // printf("\n End Recv - Rank: %d , neighbor Rank: %d \n", PID, nb_rank);
305 // fflush(stdout);
306}
307
309{
310 // printf("\n Start Send Rank: %d, neighbor Rank: %d, request = %d \n", PID, nb_rank, (int)requestGPU.size());
311 // fflush(stdout);
312
315 requests.push_back(request);
316 // printf("\n End Send - Rank: %d , neighbor Rank: %d \n", PID, nb_rank);
317 // fflush(stdout);
318}
319
324
326{
327 MPI_Waitall((int)requests.size(), requests.data(), MPI_STATUSES_IGNORE);
328}
329
331{
332 requests.clear();
333}
334
335}
336
337#endif
338
#define VF_LOG_INFO(...)
Definition Logger.h:50
#define VF_LOG_CRITICAL(...)
Definition Logger.h:52
static std::shared_ptr< Communicator > instance
static std::mutex instantiation_mutex
A class uses MPI library to communication.
double reduceSum(double quantityPerProcess) const override
void * getNativeCommunicator() override
void allGather(std::vector< int > &svalues, std::vector< int > &rvalues) override
void abort(int errorcode) override
int getProcessID() const override
void receiveNonBlocking(real *rbuf, int count_r, int sourceRank) override
int getBundleRoot() const override
void send(real *sbuf, int count_s, int nb_rank) const override
int getNumberOfBundles() const override
void allReduceSum(std::vector< float > &values) override
int mapCudaDevicesOnHosts(const std::vector< unsigned int > &devices, int numberOfDevices) const override
int getNumberOfProcesses() const override
void receiveSerializedObject(std::stringstream &ss, int source) override
void receiveSend(uint *buffer_receive, int size_buffer_recv, int neighbor_rank_recv, const uint *buffer_send, int size_buffer_send, int neighbor_rank_send) const override
int getProcessRoot() const override
void sendNonBlocking(real *sbuf, int count_s, int destinationRank) override
void broadcast(int &value) override
void sendSerializedObject(std::stringstream &ss, int target) override
std::vector< std::string > gather(const std::string &str) override
static std::shared_ptr< Communicator > getInstance()
int getNumberOfProcessesInBundle(int bundle) const override
std::shared_ptr< T > SPtr
float real
Definition DataTypes.h:42
unsigned int uint
Definition DataTypes.h:47
#define VF_MPI_REAL