preCICE v3.1.2
Loading...
Searching...
No Matches
MPIPortsCommunication.cpp
Go to the documentation of this file.
1#ifndef PRECICE_NO_MPI
2
3#include <filesystem>
4#include <ostream>
5#include <utility>
6
11#include "utils/MPIResult.hpp"
12#include "utils/String.hpp"
13#include "utils/assertion.hpp"
14
16
17namespace precice::com {
19 : _addressDirectory(std::move(addressDirectory))
20{
23 }
24}
25
31
38
40 std::string const &requesterName,
41 std::string const &tag,
42 int acceptorRank,
43 int rankOffset)
44{
45 PRECICE_TRACE(acceptorName, requesterName, acceptorRank);
47
48 setRankOffset(rankOffset);
49
50 _isAcceptor = true;
51
52 MPIResult res;
53
55 res = MPI_Open_port(MPI_INFO_NULL, sm.data());
56 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
57 _portName = sm.str();
58
59 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
60 conInfo.write(_portName);
61 PRECICE_DEBUG("Accept connection at {}", _portName);
62
63 int peerCount = -1; // The total count of peers (initialized in the first iteration)
64 int peerCurrent = 0; // Current peer to connect to
65 do {
66 // Connection
68 res = MPI_Comm_accept(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
69 PRECICE_CHECK(res, "MPI_Comm_accept failed with message: {}", res.message());
70 PRECICE_DEBUG("Accepted connection at {} for peer {}", _portName, peerCurrent);
71
72 // Which rank is requesting a connection?
73 int requesterRank = -1;
74 MPI_Recv(&requesterRank, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
75 // How big is the communicator of the requester
76 int requesterCommunicatorSize = -1;
77 MPI_Recv(&requesterCommunicatorSize, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
78 // Send the rank of the acceptor (this rank).
79 MPI_Send(&acceptorRank, 1, MPI_INT, 0, 42, communicator);
80
81 // Initialize the count of peers to connect to
82 if (peerCurrent == 0) {
83 peerCount = requesterCommunicatorSize;
84 }
85
86 PRECICE_ASSERT(requesterCommunicatorSize > 0,
87 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
88 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
89 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
90 PRECICE_ASSERT(_communicators.count(requesterRank) == 0,
91 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
92
93 _communicators.emplace(requesterRank, communicator);
94
95 } while (++peerCurrent < peerCount);
96
97 res = MPI_Close_port(const_cast<char *>(_portName.c_str()));
98 PRECICE_CHECK(res, "MPI_Close_port failed with message: {}", res.message());
100 PRECICE_DEBUG("Closed Port");
101
102 _isConnected = true;
103}
104
106 std::string const &requesterName,
107 std::string const &tag,
108 int acceptorRank,
109 int requesterCommunicatorSize)
110{
111 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
112 PRECICE_ASSERT(requesterCommunicatorSize >= 0, "Requester communicator size has to be positive.");
114
115 _isAcceptor = true;
116 MPIResult res;
117
119 res = MPI_Open_port(MPI_INFO_NULL, sm.data());
120 PRECICE_CHECK(res, "MPI_Open_port failed with message: {}", res.message());
121 _portName = sm.str();
122
123 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
124 conInfo.write(_portName);
125 PRECICE_DEBUG("Accept connection at {}", _portName);
126
127 for (int connection = 0; connection < requesterCommunicatorSize; ++connection) {
129 res = MPI_Comm_accept(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
130 PRECICE_CHECK(res, "MPI_Comm_accept failed with message: {}", res.message());
131 PRECICE_DEBUG("Accepted connection at {}", _portName);
132
133 // Receive the rank of requester
134 int requesterRank = -1;
135 MPI_Recv(&requesterRank, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
136 PRECICE_ASSERT(requesterRank >= 0, "Invalid requester rank!");
137
138 PRECICE_ASSERT(_communicators.count(requesterRank) == 0, "This connection has already been established.");
139
140 _communicators.emplace(requesterRank, communicator);
141 }
142
143 res = MPI_Close_port(const_cast<char *>(_portName.c_str()));
144 PRECICE_CHECK(res, "MPI_Close_port failed with message: {}", res.message());
146 PRECICE_DEBUG("Closed Port");
147
148 _isConnected = true;
149}
150
152 std::string const &requesterName,
153 std::string const &tag,
154 int requesterRank,
155 int requesterCommunicatorSize)
156{
157 PRECICE_TRACE(acceptorName, requesterName);
159 _isAcceptor = false;
160
161 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
162 _portName = conInfo.read();
163
164 PRECICE_DEBUG("Request connection to {}", _portName);
165
167 MPIResult res = MPI_Comm_connect(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
168 PRECICE_CHECK(res, "MPI_Comm_connect failed with message: {}", res.message());
169 PRECICE_DEBUG("Requested connection to {}", _portName);
170
171 _isConnected = true;
172
173 // Send the rank of the requester (this rank)
174 MPI_Send(&requesterRank, 1, MPI_INT, 0, 42, communicator);
175 // Send the size of the requester communicator size
176 MPI_Send(&requesterCommunicatorSize, 1, MPI_INT, 0, 42, communicator);
177 // Receive the rank of the acceptor that we connected to.
178 int acceptorRank = -1;
179 MPI_Recv(&acceptorRank, 1, MPI_INT, 0, 42, communicator, MPI_STATUS_IGNORE);
180 // @todo The following assertion should always be the case, however the
181 // acceleration package currently violates this in order to create a circular
182 // intra Communication.
183 //
184 // PRECICE_ASSERT(acceptorRank == 0, "The acceptor always has to be 0.");
185
186 _communicators.emplace(acceptorRank, communicator);
187}
188
190 std::string const & requesterName,
191 std::string const & tag,
192 std::set<int> const &acceptorRanks,
193 int requesterRank)
194
195{
196 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
198
199 _isAcceptor = false;
200
201 for (int acceptorRank : acceptorRanks) {
202 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
203 _portName = conInfo.read();
204 PRECICE_DEBUG("Request connection to {}", _portName);
205
207 MPIResult res = MPI_Comm_connect(const_cast<char *>(_portName.c_str()), MPI_INFO_NULL, 0, MPI_COMM_SELF, &communicator);
208 PRECICE_CHECK(res, "MPI_Comm_connect failed with message: {}", res.message());
209 PRECICE_DEBUG("Requested connection to {}", _portName);
210
211 // Rank 0 is always the peer, because we connected on COMM_SELF
212 MPI_Send(&requesterRank, 1, MPI_INT, 0, 42, communicator);
213
214 PRECICE_ASSERT(_communicators.count(acceptorRank) == 0, "This connection has already been established.");
215 _communicators.emplace(acceptorRank, communicator);
216 }
217 _isConnected = true;
218}
219
221{
223
224 if (not isConnected())
225 return;
226
227 for (auto &communicator : _communicators) {
228 MPIResult res = MPI_Comm_disconnect(&communicator.second);
229 PRECICE_WARN_IF(!res,
230 "MPI_Comm_disconnect failed with message: {}", res.message());
231 }
232 _communicators.clear();
233
234 PRECICE_DEBUG("Disconnected");
235
236 _isConnected = false;
237}
238
240 std::string const &requesterName)
241{
242 using namespace std::filesystem;
243 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
244 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
245 try {
246 create_directories(dir);
247 } catch (const std::filesystem::filesystem_error &e) {
248 PRECICE_WARN("Creating directory for connection info failed with: {}", e.what());
249 }
250}
251
253 std::string const &requesterName)
254{
255 using namespace std::filesystem;
256 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
257 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
258 try {
259 remove_all(dir);
260 } catch (const std::filesystem::filesystem_error &e) {
261 PRECICE_WARN("Cleaning up connection info failed with: {}", e.what());
262 }
263}
264
266{
268 // Use bounds checking here, because a std::map otherwise creates element
269 return _communicators.at(rank);
270}
271
273{
274 return 0;
275}
276
277} // namespace precice::com
278
279#endif // not PRECICE_NO_MPI
#define PRECICE_WARN_IF(condition,...)
Definition LogMacros.hpp:21
#define PRECICE_WARN(...)
Definition LogMacros.hpp:11
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:64
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:95
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:35
#define PRECICE_ASSERT(...)
Definition assertion.hpp:87
T c_str(T... args)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
virtual void closeConnection() override
Disconnects from communication space, i.e. participant.
virtual void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
virtual void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
virtual void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
virtual void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
virtual void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
std::map< int, MPI_Comm > _communicators
Remote rank -> communicator map.
MPIPortsCommunication(std::string addressDirectory=".")
std::string _portName
Name of the port used for connection.
virtual MPI_Comm & communicator(Rank rank) override
Returns the communicator.
virtual Rank rank(int rank) override
virtual size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
virtual void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
Utility class to build a string from C functions with output pointers and static maximum length.
Definition String.hpp:14
std::string str() const
Definition String.hpp:35
T clear(T... args)
T empty(T... args)
T generic_string(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
int Rank
Definition Types.hpp:37
STL namespace.
std::string message() const
Definition MPIResult.hpp:39