preCICE v3.2.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SocketCommunication.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <boost/asio.hpp>
3
4#include <filesystem>
5#include <sstream>
6#include <stdexcept>
7#include <utility>
8
11#include "SocketRequest.hpp"
12#include "logging/LogMacros.hpp"
14#include "utils/assertion.hpp"
15#include "utils/networking.hpp"
16#include "utils/span_tools.hpp"
17
18namespace precice::com {
19
20namespace asio = boost::asio;
21
23 bool reuseAddress,
24 std::string networkName,
25 std::string addressDirectory)
26 : _portNumber(portNumber),
27 _reuseAddress(reuseAddress),
28 _networkName(std::move(networkName)),
29 _addressDirectory(std::move(addressDirectory)),
30 _ioContext(new IOContext)
31{
34 }
35}
36
38 : SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
39{
40}
41
47
54
56 std::string const &requesterName,
57 std::string const &tag,
58 int acceptorRank,
59 int rankOffset)
60{
61 PRECICE_TRACE(acceptorName, requesterName);
62
64
65 setRankOffset(rankOffset);
66
68
69 try {
70 std::string ipAddress = getIpAddress();
71 PRECICE_CHECK(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
72
73 using asio::ip::tcp;
74
75 tcp::acceptor acceptor(*_ioContext);
76 tcp::endpoint endpoint(tcp::v4(), _portNumber);
77
78 acceptor.open(endpoint.protocol());
79 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
80 acceptor.bind(endpoint);
81 acceptor.listen();
82
83 _portNumber = acceptor.local_endpoint().port();
84 address = ipAddress + ":" + std::to_string(_portNumber);
85 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
86 conInfo.write(address);
87 PRECICE_DEBUG("Accept connection at {}", address);
88
89 int peerCurrent = 0; // Current peer to connect to
90 int peerCount = -1; // The total count of peers (initialized in the first iteration)
91 int requesterCommunicatorSize = -1;
92
93 do {
95
96 acceptor.accept(*socket);
97 boost::asio::ip::tcp::no_delay option(true);
98 socket->set_option(option);
99
100 PRECICE_DEBUG("Accepted connection at {}", address);
101 _isConnected = true;
102
103 int requesterRank = -1;
104
105 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
106
107 PRECICE_ASSERT(_sockets.count(requesterRank) == 0,
108 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
109
110 _sockets[requesterRank] = std::move(socket);
111 // send and receive expect a rank from the acceptor perspective.
112 // Thus we need to apply given rankOffset before passing it to send/receive.
113 // This is essentially the inverse of adjustRank().
114 auto adjustedRequesterRank = requesterRank + rankOffset;
115 send(acceptorRank, adjustedRequesterRank);
116 receive(requesterCommunicatorSize, adjustedRequesterRank);
117
118 // Initialize the count of peers to connect to
119 if (peerCurrent == 0) {
120 peerCount = requesterCommunicatorSize;
121 }
122
123 PRECICE_ASSERT(requesterCommunicatorSize > 0,
124 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
125 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
126 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
127 } while (++peerCurrent < requesterCommunicatorSize);
128
129 acceptor.close();
130 } catch (std::exception &e) {
131 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
132 }
133
134 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
135 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
136 _thread = std::thread([this] { _ioContext->run(); });
137}
138
140 std::string const &requesterName,
141 std::string const &tag,
142 int acceptorRank,
143 int requesterCommunicatorSize)
144{
145 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
146 PRECICE_ASSERT(requesterCommunicatorSize >= 0, "Requester communicator size has to be positive.");
148
149 if (requesterCommunicatorSize == 0) {
150 PRECICE_DEBUG("Accepting no connections.");
151 _isConnected = true;
152 return;
153 }
154
156
157 try {
158 std::string ipAddress = getIpAddress();
159 PRECICE_ASSERT(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
160
161 using asio::ip::tcp;
162
163 tcp::acceptor acceptor(*_ioContext);
164 {
165 tcp::endpoint endpoint(tcp::v4(), _portNumber);
166
167 acceptor.open(endpoint.protocol());
168 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
169 acceptor.bind(endpoint);
170 acceptor.listen();
171
172 _portNumber = acceptor.local_endpoint().port();
173 }
174
175 address = ipAddress + ":" + std::to_string(_portNumber);
176 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
177 conInfo.write(address);
178
179 PRECICE_DEBUG("Accepting connection at {}", address);
180
181 for (int connection = 0; connection < requesterCommunicatorSize; ++connection) {
182 auto socket = std::make_shared<Socket>(*_ioContext);
183 acceptor.accept(*socket);
184 boost::asio::ip::tcp::no_delay option(true);
185 socket->set_option(option);
186 PRECICE_DEBUG("Accepted connection at {}", address);
187 _isConnected = true;
188
189 int requesterRank;
190 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
191 _sockets[requesterRank] = std::move(socket);
192 }
193
194 acceptor.close();
195 } catch (std::exception &e) {
196 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
197 }
198
199 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
200 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
201 _thread = std::thread([this] { _ioContext->run(); });
202}
203
205 std::string const &requesterName,
206 std::string const &tag,
207 int requesterRank,
208 int requesterCommunicatorSize)
209{
210 PRECICE_TRACE(acceptorName, requesterName);
212
213 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
214 std::string const address = conInfo.read();
215 PRECICE_DEBUG("Request connection to {}", address);
216 auto const sepidx = address.find(':');
217 std::string const ipAddress = address.substr(0, sepidx);
218 std::string const portNumber = address.substr(sepidx + 1);
219 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
220
221 try {
222 auto socket = std::make_shared<Socket>(*_ioContext);
223
224 using asio::ip::tcp;
225
226 while (not isConnected()) {
227 tcp::resolver resolver(*_ioContext);
228 auto results = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
229
230 auto endpoint = results.begin()->endpoint();
231 boost::system::error_code error = asio::error::host_not_found;
232 socket->connect(endpoint, error);
233
234 _isConnected = not error;
235
236 if (not isConnected()) {
237 // Wait a little, since after a couple of ten-thousand trials the system
238 // seems to get confused and the requester connects wrongly to itself.
239 boost::asio::deadline_timer timer(*_ioContext, boost::posix_time::milliseconds(1));
240 timer.wait();
241 }
242 }
243 boost::asio::ip::tcp::no_delay option(true);
244 socket->set_option(option);
245
246 PRECICE_DEBUG("Requested connection to {}", address);
247
248 asio::write(*socket, asio::buffer(&requesterRank, sizeof(int)));
249
250 int acceptorRank = -1;
251 asio::read(*socket, asio::buffer(&acceptorRank, sizeof(int)));
252 _sockets[0] = std::move(socket); // should be acceptorRank instead of 0, likewise all communication below
253
254 send(requesterCommunicatorSize, 0);
255
256 } catch (std::exception &e) {
257 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
258 }
259
260 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
261 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
262 _thread = std::thread([this] { _ioContext->run(); });
263}
264
266 std::string const &requesterName,
267 std::string const &tag,
268 std::set<int> const &acceptorRanks,
269 int requesterRank)
270
271{
272 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
274
275 for (auto const &acceptorRank : acceptorRanks) {
276 _isConnected = false;
277 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
278 std::string const address = conInfo.read();
279 auto const sepidx = address.find(':');
280 std::string const ipAddress = address.substr(0, sepidx);
281 std::string const portNumber = address.substr(sepidx + 1);
282 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
283
284 try {
285 auto socket = std::make_shared<Socket>(*_ioContext);
286
287 using asio::ip::tcp;
288
289 PRECICE_DEBUG("Requesting connection to {}, port {}", ipAddress, portNumber);
290
291 while (not isConnected()) {
292 tcp::resolver resolver(*_ioContext);
293 auto endpoints = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
294
295 boost::system::error_code error = asio::error::host_not_found;
296 boost::asio::connect(*socket, endpoints, error);
297
298 _isConnected = not error;
299
300 if (not isConnected()) {
301 // Wait a little, since after a couple of ten-thousand trials the system
302 // seems to get confused and the requester connects wrongly to itself.
303 boost::asio::deadline_timer timer(*_ioContext, boost::posix_time::milliseconds(1));
304 timer.wait();
305 }
306 }
307 boost::asio::ip::tcp::no_delay option(true);
308 socket->set_option(option);
309
310 PRECICE_DEBUG("Requested connection to {}, rank = {}", address, acceptorRank);
311 _sockets[acceptorRank] = std::move(socket);
312 send(requesterRank, acceptorRank); // send my rank
313
314 } catch (std::exception &e) {
315 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
316 }
317 }
318 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
319 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
320 _thread = std::thread([this] { _ioContext->run(); });
321}
322
324{
326
327 if (not isConnected())
328 return;
329
330 if (_thread.joinable()) {
332 _ioContext->stop();
333 _thread.join();
334 }
335
336 for (auto &socket : _sockets) {
337 PRECICE_ASSERT(socket.second->is_open());
338
339 try {
340 socket.second->shutdown(Socket::shutdown_send);
341 socket.second->close();
342 } catch (std::exception &e) {
343 PRECICE_WARN("Socket shutdown failed with system error: {}", e.what());
344 }
345 }
346
347 _isConnected = false;
348}
349
350void SocketCommunication::send(std::string const &itemToSend, Rank rankReceiver)
351{
352 PRECICE_TRACE(itemToSend, rankReceiver);
353
354 rankReceiver = adjustRank(rankReceiver);
355
356 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
358
359 size_t size = itemToSend.size() + 1;
360 try {
361 asio::write(*_sockets[rankReceiver], asio::buffer(&size, sizeof(size_t)));
362 asio::write(*_sockets[rankReceiver], asio::buffer(itemToSend.c_str(), size));
363 } catch (std::exception &e) {
364 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
365 }
366}
367
369{
370 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
371
372 rankReceiver = adjustRank(rankReceiver);
373
374 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
376
377 try {
378 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)));
379 } catch (std::exception &e) {
380 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
381 }
382}
383
385 std::string const &requesterName)
386{
387 using namespace std::filesystem;
388 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
389 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
390 try {
391 create_directories(dir);
392 } catch (const std::filesystem::filesystem_error &e) {
393 PRECICE_WARN("Creating directory for connection info failed with filesystem error: {}", e.what());
394 }
395}
396
398 std::string const &requesterName)
399{
400 using namespace std::filesystem;
401 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
402 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
403 try {
404 remove_all(dir);
405 } catch (const std::filesystem::filesystem_error &e) {
406 PRECICE_WARN("Cleaning up connection info failed with filesystem error {}", e.what());
407 }
408}
409
411{
412 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
413
414 rankReceiver = adjustRank(rankReceiver);
415
416 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
418
419 PtrRequest request(new SocketRequest);
420
421 _queue.dispatch(_sockets[rankReceiver],
422 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)),
423 [request] {
424 std::static_pointer_cast<SocketRequest>(request)->complete();
425 });
426 return request;
427}
428
430{
431 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
432
433 rankReceiver = adjustRank(rankReceiver);
434
435 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
437
438 try {
439 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)));
440 } catch (std::exception &e) {
441 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
442 }
443}
444
446{
447 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
448
449 rankReceiver = adjustRank(rankReceiver);
450
451 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
453
454 PtrRequest request(new SocketRequest);
455
456 _queue.dispatch(_sockets[rankReceiver],
457 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)),
458 [request] {
459 std::static_pointer_cast<SocketRequest>(request)->complete();
460 });
461 return request;
462}
463
464void SocketCommunication::send(double itemToSend, Rank rankReceiver)
465{
466 PRECICE_TRACE(itemToSend, rankReceiver);
467
468 rankReceiver = adjustRank(rankReceiver);
469
470 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
472
473 try {
474 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(double)));
475 } catch (std::exception &e) {
476 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
477 }
478}
479
480PtrRequest SocketCommunication::aSend(const double &itemToSend, Rank rankReceiver)
481{
482 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
483}
484
485void SocketCommunication::send(int itemToSend, Rank rankReceiver)
486{
487 PRECICE_TRACE(itemToSend, rankReceiver);
488
489 rankReceiver = adjustRank(rankReceiver);
490
491 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
493
494 try {
495 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(int)));
496 } catch (std::exception &e) {
497 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
498 }
499}
500
501PtrRequest SocketCommunication::aSend(const int &itemToSend, Rank rankReceiver)
502{
503 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
504}
505
506void SocketCommunication::send(bool itemToSend, Rank rankReceiver)
507{
508 PRECICE_TRACE(itemToSend, rankReceiver);
509
510 rankReceiver = adjustRank(rankReceiver);
511
512 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
514
515 try {
516 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(bool)));
517 } catch (std::exception &e) {
518 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
519 }
520}
521
522PtrRequest SocketCommunication::aSend(const bool &itemToSend, Rank rankReceiver)
523{
524 PRECICE_TRACE(rankReceiver);
525
526 rankReceiver = adjustRank(rankReceiver);
527
528 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
530
531 PtrRequest request(new SocketRequest);
532
533 _queue.dispatch(_sockets[rankReceiver],
534 asio::buffer(&itemToSend, sizeof(bool)),
535 [request] {
536 std::static_pointer_cast<SocketRequest>(request)->complete();
537 });
538 return request;
539}
540
541void SocketCommunication::receive(std::string &itemToReceive, Rank rankSender)
542{
543 PRECICE_TRACE(rankSender);
544
545 rankSender = adjustRank(rankSender);
546
547 PRECICE_ASSERT(rankSender >= 0, rankSender);
549
550 size_t size = 0;
551
552 try {
553 asio::read(*_sockets[rankSender], asio::buffer(&size, sizeof(size_t)));
554 std::vector<char> msg(size);
555 asio::read(*_sockets[rankSender], asio::buffer(msg.data(), size));
556 itemToReceive = msg.data();
557 } catch (std::exception &e) {
558 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
559 }
560}
561
563{
564 PRECICE_TRACE(itemsToReceive.size(), rankSender);
565
566 rankSender = adjustRank(rankSender);
567
568 PRECICE_ASSERT(rankSender >= 0, rankSender);
570
571 try {
572 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(int)));
573 } catch (std::exception &e) {
574 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
575 }
576}
577
579{
580 PRECICE_TRACE(itemsToReceive.size(), rankSender);
581
582 rankSender = adjustRank(rankSender);
583
584 PRECICE_ASSERT(rankSender >= 0, rankSender);
586
587 try {
588 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)));
589 } catch (std::exception &e) {
590 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
591 }
592}
593
595 int rankSender)
596{
597 PRECICE_TRACE(itemsToReceive.size(), rankSender);
598
599 rankSender = adjustRank(rankSender);
600
601 PRECICE_ASSERT(rankSender >= 0, rankSender);
603
604 PtrRequest request(new SocketRequest);
605
606 try {
607 asio::async_read(*_sockets[rankSender],
608 asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)),
609 [request](boost::system::error_code const &, std::size_t) {
610 std::static_pointer_cast<SocketRequest>(request)->complete();
611 });
612 } catch (std::exception &e) {
613 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
614 }
615
616 return request;
617}
618
619void SocketCommunication::receive(double &itemToReceive, Rank rankSender)
620{
621 PRECICE_TRACE(rankSender);
622
623 rankSender = adjustRank(rankSender);
624
625 PRECICE_ASSERT(rankSender >= 0, rankSender);
627
628 try {
629 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(double)));
630 } catch (std::exception &e) {
631 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
632 }
633}
634
635PtrRequest SocketCommunication::aReceive(double &itemToReceive, Rank rankSender)
636{
637 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
638}
639
640void SocketCommunication::receive(int &itemToReceive, Rank rankSender)
641{
642 PRECICE_TRACE(rankSender);
643
644 rankSender = adjustRank(rankSender);
645
646 PRECICE_ASSERT(rankSender >= 0, rankSender);
648
649 try {
650 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(int)));
651 } catch (std::exception &e) {
652 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
653 }
654}
655
656PtrRequest SocketCommunication::aReceive(int &itemToReceive, Rank rankSender)
657{
658 PRECICE_TRACE(rankSender);
659
660 rankSender = adjustRank(rankSender);
661
662 PRECICE_ASSERT((rankSender >= 0) && (rankSender < (int) _sockets.size()),
663 rankSender, _sockets.size());
665
666 PtrRequest request(new SocketRequest);
667
668 try {
669 asio::async_read(*_sockets[rankSender],
670 asio::buffer(&itemToReceive, sizeof(int)),
671 [request](boost::system::error_code const &, std::size_t) {
672 std::static_pointer_cast<SocketRequest>(request)->complete();
673 });
674 } catch (std::exception &e) {
675 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
676 }
677
678 return request;
679}
680
681void SocketCommunication::receive(bool &itemToReceive, Rank rankSender)
682{
683 PRECICE_TRACE(rankSender);
684
685 rankSender = adjustRank(rankSender);
686
687 PRECICE_ASSERT(rankSender >= 0, rankSender);
689
690 try {
691 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(bool)));
692 } catch (std::exception &e) {
693 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
694 }
695}
696
697PtrRequest SocketCommunication::aReceive(bool &itemToReceive, Rank rankSender)
698{
699 PRECICE_TRACE(rankSender);
700
701 rankSender = adjustRank(rankSender);
702
703 PRECICE_ASSERT(rankSender >= 0, rankSender);
705
706 PtrRequest request(new SocketRequest);
707
708 try {
709 asio::async_read(*_sockets[rankSender],
710 asio::buffer(&itemToReceive, sizeof(bool)),
711 [request](boost::system::error_code const &, std::size_t) {
712 std::static_pointer_cast<SocketRequest>(request)->complete();
713 });
714 } catch (std::exception &e) {
715 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
716 }
717
718 return request;
719}
720
721#ifndef _WIN32
722namespace {
723struct Interface {
724 unsigned int index;
727};
728
729std::vector<Interface> detectInterfaces()
730{
731 std::vector<Interface> interfaces;
732
733 // Collect interface indices and names
734 struct if_nameindex *nameInterface = if_nameindex();
735 for (struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
736 Interface interface;
737 interface.index = itNameInterface->if_index;
738 interface.name = itNameInterface->if_name;
739 interfaces.emplace_back(std::move(interface));
740 }
741 if_freenameindex(nameInterface);
742
743 // Resolve addresses for interfaces
744 for (auto &interface : interfaces) {
745 struct ifreq request;
746 strncpy(request.ifr_name,
747 interface.name.c_str(),
748 IFNAMSIZ - 1); // Copy interface name
749
750 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
751 if (socketfd == -1) {
752 continue;
753 }
754 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
755 close(socketfd);
756 if (err) {
757 continue;
758 }
759
760 const char *addr = inet_ntoa((reinterpret_cast<struct sockaddr_in *>(&request.ifr_addr))->sin_addr);
761 if (!addr) {
762 continue;
763 }
764 interface.address = addr;
765 }
766
767 return interfaces;
768}
769} // namespace
770#endif
771
773{
775
776#ifdef _WIN32
777 return "127.0.0.1";
778#else
779
780 PRECICE_DEBUG("Looking for IP address of network \"{}\"", _networkName);
781
782 auto interfaces = detectInterfaces();
783
784 auto pos = std::find_if(interfaces.begin(), interfaces.end(),
785 [&](Interface const &interface) { return interface.name == _networkName; });
786 if (pos == interfaces.end()) {
787 PRECICE_DEBUG("There's NOTHING");
789 err << "Cannot find network interface \"" << _networkName << "\". Available interfaces are: ";
790 for (const auto &interface : interfaces) {
791 err << interface.name << ' ';
792 }
793 err << " Please check \"network\" attributes in your configuration file.";
794 PRECICE_ERROR(err.str());
795 }
796
797 // Unconnected interfaces don't have an IP address.
798 PRECICE_CHECK(not pos->address.empty(), "The interface \"{}\" does not have an IP address. Please select another interface.", _networkName);
799
800 PRECICE_DEBUG("Detected network IP address of interface \"{}\": {}.", _networkName, pos->address);
801 return pos->address;
802#endif
803}
804
805} // namespace precice::com
#define PRECICE_ERROR(...)
Definition LogMacros.hpp:16
#define PRECICE_WARN(...)
Definition LogMacros.hpp:12
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:61
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:92
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:32
unsigned int index
std::string address
std::string name
#define PRECICE_ASSERT(...)
Definition assertion.hpp:85
T begin(T... args)
T c_str(T... args)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
Implements Communication by using sockets.
std::unique_ptr< WorkGuard > _workGuard
PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
std::shared_ptr< IOContext > _ioContext
void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
void closeConnection() override
Disconnects from communication space, i.e. participant.
void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
void dispatch(std::shared_ptr< Socket > sock, boost::asio::const_buffer data, std::function< void()> callback)
Put data in the queue, start processing the queue.
A C++ 11 implementation of the non-owning C++20 std::span type.
Definition span.hpp:284
constexpr pointer data() const noexcept
Definition span.hpp:500
constexpr size_type size() const noexcept
Definition span.hpp:469
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find(T... args)
T generic_string(T... args)
T join(T... args)
T joinable(T... args)
T make_unique(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
int Rank
Definition Types.hpp:37
STL namespace.
T reset(T... args)
T size(T... args)
T stoul(T... args)
T str(T... args)
T strncpy(T... args)
T substr(T... args)
T to_string(T... args)
T what(T... args)