preCICE v3.1.2
Loading...
Searching...
No Matches
MPISinglePortsCommunication.cpp
Go to the documentation of this file.
1#ifndef PRECICE_NO_MPI
2
3#include <filesystem>
4#include <memory>
5#include <mpi.h>
6#include <ostream>
7#include <utility>
8
11#include "logging/LogMacros.hpp"
13#include "utils/IntraComm.hpp"
14#include "utils/MPIResult.hpp"
15#include "utils/Parallel.hpp"
16#include "utils/String.hpp"
17#include "utils/assertion.hpp"
18
20
21namespace precice::com {
23 : _addressDirectory(std::move(addressDirectory))
24{
27 }
28}
29
35
37{
40 if (_global != MPI_COMM_NULL) {
41 int size = -1;
42 MPI_Comm_remote_size(_global, &size);
43 return size;
44 } else {
45 return _initialCommSize;
46 }
47}
48
50 std::string const &requesterName,
51 std::string const &tag,
52 int acceptorRank,
53 int rankOffset)
54{
55 PRECICE_TRACE(acceptorName, requesterName);
57
58 setRankOffset(rankOffset);
59
60 _isAcceptor = true;
61
62 MPIResult res;
63
65 res = MPI_Open_port(MPI_INFO_NULL, sm.data());
66 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
67
68 _portName = sm.str();
69
70 ConnectionInfoWriter conPub(acceptorName, requesterName, tag, _addressDirectory);
71 conPub.write(_portName);
72
73 int peerCurrent = 0; // current peer to connect to
74 int peerCount = -1; // The total count of peers (initialized in the first iteration)
75 do {
76 // Connection
78 res = MPI_Comm_accept(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
79 PRECICE_CHECK(res, "MPI_Comm_accept failed with message: {}", res.message());
80 PRECICE_DEBUG("Accepted connection at {} for peer {}", _portName, peerCurrent);
81
82 // Exchange information to which rank I am connected and which communicator size on the other side
83 int requesterRank = -1;
84 MPI_Recv(&requesterRank, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
85 int requesterCommunicatorSize = -1;
86 MPI_Recv(&requesterCommunicatorSize, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
87 MPI_Send(&acceptorRank, 1, MPI_INT, 0, 42, communicator);
88
89 // Initialize the count of peers to connect to
90 if (peerCurrent == 0) {
91 peerCount = requesterCommunicatorSize;
92 }
93
94 PRECICE_ASSERT(requesterCommunicatorSize > 0,
95 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
96 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
97 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
98 PRECICE_ASSERT(_direct.count(requesterRank) == 0,
99 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
100
101 _direct.emplace(requesterRank, communicator);
102
103 PRECICE_ASSERT(peerCount > 0);
104 } while (++peerCurrent < peerCount);
105
106 _initialCommSize = peerCount;
107 _isConnected = true;
108}
109
112 std::string const &requesterName,
113 std::string const &tag,
114 int acceptorRank,
115 int requesterCommunicatorSize)
116{
117 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
119
120 _isAcceptor = true;
121
122 const Rank rank = utils::Parallel::current()->rank();
123 MPIResult res;
124
125 if (rank == 0) { // only primary rank opens a port
126 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
127
129 res = MPI_Open_port(MPI_INFO_NULL, sm.data());
130 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
131 _portName = sm.str();
132
133 conInfo.write(_portName);
134 PRECICE_DEBUG("Accept connection at {}", _portName);
135
136 res = MPI_Comm_accept(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, utils::Parallel::current()->comm, &_global);
137 PRECICE_CHECK(res, "MPI_Comm_accept failed with message: {}", res.message());
138 PRECICE_DEBUG("Accepted connection at {}", _portName);
139
140 } else { // Secondary ranks call simply call accept
141
142 // The port is only used on the root rank
143 res = MPI_Comm_accept(nullptr, MPI_INFO_NULL, 0, utils::Parallel::current()->comm, &_global);
144 PRECICE_CHECK(res, "MPI_Comm_accept failed with message: {}", res.message());
145 PRECICE_DEBUG("Accepted connection");
146 }
147
148 _isConnected = true;
149}
150
152 std::string const &requesterName,
153 std::string const &tag,
154 int requesterRank,
155 int requesterCommunicatorSize)
156{
157 PRECICE_TRACE(acceptorName, requesterName);
159 _isAcceptor = false;
160
161 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
162 _portName = conInfo.read();
163 PRECICE_DEBUG("Request connection to {}", _portName);
164
166 MPIResult res = MPI_Comm_connect(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
167 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
168 PRECICE_DEBUG("Requested connection to {}", _portName);
169
170 // Send the rank of this requester
171 MPI_Send(&requesterRank, 1, MPI_INT, 0, 42, communicator);
172 // Send the rank of this requesters communicator size
173 MPI_Send(&requesterCommunicatorSize, 1, MPI_INT, 0, 42, communicator);
174 // Receive the acceptorRank, which should always be 0
175 int acceptorRank = -1;
176 MPI_Recv(&acceptorRank, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
177 PRECICE_ASSERT(acceptorRank == 0);
178
179 _direct.emplace(acceptorRank, communicator);
180
181 _initialCommSize = requesterCommunicatorSize;
182 _isConnected = true;
183}
184
186 std::string const & requesterName,
187 std::string const & tag,
188 std::set<int> const &acceptorRanks,
189 int requesterRank)
190{
191 PRECICE_TRACE(acceptorName, requesterName);
193
194 _isAcceptor = false;
195
196 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
197 _portName = conInfo.read();
198 PRECICE_DEBUG("Request connection to {}", _portName);
199
200 MPIResult res = MPI_Comm_connect(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0,
202 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
203 PRECICE_DEBUG("Requested connection to {}", _portName);
204
205 _isConnected = true;
206}
207
209{
210 PRECICE_TRACE(_direct.size());
211
212 if (not isConnected())
213 return;
214 MPIResult res;
215
216 for (auto &kv : _direct) {
217 res = MPI_Comm_disconnect(&kv.second);
218 PRECICE_WARN_IF(!res, "MPI_Open_port failed with message: {}", res.message());
219 }
220 _direct.clear();
221 if (_global != MPI_COMM_NULL) {
222 res = MPI_Comm_disconnect(&_global);
223 PRECICE_WARN_IF(!res, "MPI_Open_port failed with message: {}", res.message());
224 }
225
226 PRECICE_DEBUG("Disconnected");
227
228 if (_isAcceptor and utils::IntraComm::getRank() == 0) {
229 res = MPI_Close_port(const_cast<char *>(_portName.c_str()));
230 PRECICE_WARN_IF(!res, "MPI_Open_port failed with message: {}", res.message());
232 PRECICE_DEBUG("Port closed");
233 }
234
235 _initialCommSize = -1;
236 _isConnected = false;
237}
238
240{
241 if (_global != MPI_COMM_NULL) {
242 // Always prefer the global communicator
243 return _global;
244 } else {
245 // Use a direct communication if required
246 return _direct.at(rank);
247 }
248}
249
251{
252 if (_global != MPI_COMM_NULL) {
253 // Always prefer the global communicator
254 return rank;
255 } else {
256 // Use a direct communication if required.
257 // In this case the other rank is always 0.
258 return 0;
259 }
260}
261
263 std::string const &requesterName)
264{
265 using namespace std::filesystem;
266 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
267 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
268 try {
269 create_directories(dir);
270 } catch (const std::filesystem::filesystem_error &e) {
271 PRECICE_WARN("Creating directory for connection info failed with: {}", e.what());
272 }
273}
274
276 std::string const &requesterName)
277{
278 using namespace std::filesystem;
279 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
280 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
281 try {
282 remove_all(dir);
283 } catch (const std::filesystem::filesystem_error &e) {
284 PRECICE_WARN("Cleaning up connection info failed with: {}", e.what());
285 }
286}
287
288} // namespace precice::com
289
290#endif // not PRECICE_NO_MPI
#define PRECICE_WARN_IF(condition,...)
Definition LogMacros.hpp:21
#define PRECICE_WARN(...)
Definition LogMacros.hpp:11
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:64
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:95
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:35
#define PRECICE_ASSERT(...)
Definition assertion.hpp:87
T c_str(T... args)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
virtual void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
virtual void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
MPI_Comm _global
The global inter-communicator that connects all ranks.
virtual void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
virtual MPI_Comm & communicator(Rank rank) override
Returns the communicator.
std::string _portName
Name of the port used for connection.
virtual size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
int _initialCommSize
The communicator size known from acceptConnection and requestConnection.
virtual void closeConnection() override
Disconnects from communication space, i.e. participant.
virtual void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
requesterCommunicatorSize is not used, since connection is always made on the entire communicator
std::map< int, MPI_Comm > _direct
A map of direct communication channels based on MPI_COMM_SELF on both sides.
virtual void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
virtual void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
MPISinglePortsCommunication(std::string addressDirectory=".")
static Rank getRank()
Current rank.
Definition IntraComm.cpp:42
static CommStatePtr current()
Returns an owning pointer to the current CommState.
Definition Parallel.cpp:147
Utility class to build a string from C functions with output pointers and static maximum length.
Definition String.hpp:14
std::string str() const
Definition String.hpp:35
T clear(T... args)
T empty(T... args)
T generic_string(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
int Rank
Definition Types.hpp:37
STL namespace.
std::string message() const
Definition MPIResult.hpp:39