preCICE v3.3.0
Loading...
Searching...
No Matches
SocketCommunication.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <boost/asio.hpp>
3#include <boost/asio/deadline_timer.hpp>
4
5#include <filesystem>
6#include <sstream>
7#include <stdexcept>
8#include <utility>
9
12#include "SocketRequest.hpp"
13#include "logging/LogMacros.hpp"
15#include "utils/assertion.hpp"
16#include "utils/networking.hpp"
17#include "utils/span_tools.hpp"
18
19namespace precice::com {
20
21namespace asio = boost::asio;
22
24 bool reuseAddress,
25 std::string networkName,
26 std::string addressDirectory)
27 : _portNumber(portNumber),
28 _reuseAddress(reuseAddress),
29 _networkName(std::move(networkName)),
30 _addressDirectory(std::move(addressDirectory)),
32{
33 if (_addressDirectory.empty()) {
34 _addressDirectory = ".";
35 }
36}
37
39 : SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
40{
41}
42
48
55
57 std::string const &requesterName,
58 std::string const &tag,
59 int acceptorRank,
60 int rankOffset)
61{
62 PRECICE_TRACE(acceptorName, requesterName);
63
65
66 setRankOffset(rankOffset);
67
68 std::string address;
69
70 try {
71 std::string ipAddress = getIpAddress();
72 PRECICE_CHECK(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
73
74 using asio::ip::tcp;
75
76 tcp::acceptor acceptor(*_ioContext);
77 tcp::endpoint endpoint(tcp::v4(), _portNumber);
78
79 acceptor.open(endpoint.protocol());
80 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
81 acceptor.bind(endpoint);
82 acceptor.listen();
83
84 _portNumber = acceptor.local_endpoint().port();
85 address = ipAddress + ":" + std::to_string(_portNumber);
86 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
87 conInfo.write(address);
88 PRECICE_DEBUG("Accept connection at {}", address);
89
90 int peerCurrent = 0; // Current peer to connect to
91 int peerCount = -1; // The total count of peers (initialized in the first iteration)
92 int requesterCommunicatorSize = -1;
93
94 do {
96
97 acceptor.accept(*socket);
98 boost::asio::ip::tcp::no_delay option(true);
99 socket->set_option(option);
100
101 PRECICE_DEBUG("Accepted connection at {}", address);
102 _isConnected = true;
103
104 int requesterRank = -1;
105
106 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
107
108 PRECICE_ASSERT(_sockets.count(requesterRank) == 0,
109 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
110
111 _sockets[requesterRank] = std::move(socket);
112 // send and receive expect a rank from the acceptor perspective.
113 // Thus we need to apply given rankOffset before passing it to send/receive.
114 // This is essentially the inverse of adjustRank().
115 auto adjustedRequesterRank = requesterRank + rankOffset;
116 send(acceptorRank, adjustedRequesterRank);
117 receive(requesterCommunicatorSize, adjustedRequesterRank);
118
119 // Initialize the count of peers to connect to
120 if (peerCurrent == 0) {
121 peerCount = requesterCommunicatorSize;
122 }
123
124 PRECICE_ASSERT(requesterCommunicatorSize > 0,
125 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
126 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
127 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
128 } while (++peerCurrent < requesterCommunicatorSize);
129
130 acceptor.close();
131 } catch (std::exception &e) {
132 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
133 }
134
135 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
136 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
137 _thread = std::thread([this] { _ioContext->run(); });
138}
139
141 std::string const &requesterName,
142 std::string const &tag,
143 int acceptorRank,
144 int requesterCommunicatorSize)
145{
146 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
147 PRECICE_ASSERT(requesterCommunicatorSize >= 0, "Requester communicator size has to be positive.");
149
150 if (requesterCommunicatorSize == 0) {
151 PRECICE_DEBUG("Accepting no connections.");
152 _isConnected = true;
153 return;
154 }
155
156 std::string address;
157
158 try {
159 std::string ipAddress = getIpAddress();
160 PRECICE_ASSERT(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
161
162 using asio::ip::tcp;
163
164 tcp::acceptor acceptor(*_ioContext);
165 {
166 tcp::endpoint endpoint(tcp::v4(), _portNumber);
167
168 acceptor.open(endpoint.protocol());
169 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
170 acceptor.bind(endpoint);
171 acceptor.listen();
172
173 _portNumber = acceptor.local_endpoint().port();
174 }
175
176 address = ipAddress + ":" + std::to_string(_portNumber);
177 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
178 conInfo.write(address);
179
180 PRECICE_DEBUG("Accepting connection at {}", address);
181
182 for (int connection = 0; connection < requesterCommunicatorSize; ++connection) {
183 auto socket = std::make_shared<Socket>(*_ioContext);
184 acceptor.accept(*socket);
185 boost::asio::ip::tcp::no_delay option(true);
186 socket->set_option(option);
187 PRECICE_DEBUG("Accepted connection at {}", address);
188 _isConnected = true;
189
190 int requesterRank;
191 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
192 _sockets[requesterRank] = std::move(socket);
193 }
194
195 acceptor.close();
196 } catch (std::exception &e) {
197 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
198 }
199
200 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
201 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
202 _thread = std::thread([this] { _ioContext->run(); });
203}
204
206 std::string const &requesterName,
207 std::string const &tag,
208 int requesterRank,
209 int requesterCommunicatorSize)
210{
211 PRECICE_TRACE(acceptorName, requesterName);
213
214 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
215 std::string const address = conInfo.read();
216 PRECICE_DEBUG("Request connection to {}", address);
217 auto const sepidx = address.find(':');
218 std::string const ipAddress = address.substr(0, sepidx);
219 std::string const portNumber = address.substr(sepidx + 1);
220 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
221
222 try {
223 auto socket = std::make_shared<Socket>(*_ioContext);
224
225 using asio::ip::tcp;
226
227 while (not isConnected()) {
228 tcp::resolver resolver(*_ioContext);
229 auto results = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
230
231 auto endpoint = results.begin()->endpoint();
232 boost::system::error_code error = asio::error::host_not_found;
233 socket->connect(endpoint, error);
234
235 _isConnected = not error;
236
237 if (not isConnected()) {
238 // Wait a little, since after a couple of ten-thousand trials the system
239 // seems to get confused and the requester connects wrongly to itself.
240 boost::asio::deadline_timer timer(*_ioContext, boost::posix_time::milliseconds(1));
241 timer.wait();
242 }
243 }
244 boost::asio::ip::tcp::no_delay option(true);
245 socket->set_option(option);
246
247 PRECICE_DEBUG("Requested connection to {}", address);
248
249 asio::write(*socket, asio::buffer(&requesterRank, sizeof(int)));
250
251 int acceptorRank = -1;
252 asio::read(*socket, asio::buffer(&acceptorRank, sizeof(int)));
253 _sockets[0] = std::move(socket); // should be acceptorRank instead of 0, likewise all communication below
254
255 send(requesterCommunicatorSize, 0);
256
257 } catch (std::exception &e) {
258 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
259 }
260
261 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
262 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
263 _thread = std::thread([this] { _ioContext->run(); });
264}
265
267 std::string const &requesterName,
268 std::string const &tag,
269 std::set<int> const &acceptorRanks,
270 int requesterRank)
271
272{
273 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
275
276 for (auto const &acceptorRank : acceptorRanks) {
277 _isConnected = false;
278 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
279 std::string const address = conInfo.read();
280 auto const sepidx = address.find(':');
281 std::string const ipAddress = address.substr(0, sepidx);
282 std::string const portNumber = address.substr(sepidx + 1);
283 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
284
285 try {
286 auto socket = std::make_shared<Socket>(*_ioContext);
287
288 using asio::ip::tcp;
289
290 PRECICE_DEBUG("Requesting connection to {}, port {}", ipAddress, portNumber);
291
292 while (not isConnected()) {
293 tcp::resolver resolver(*_ioContext);
294 auto endpoints = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
295
296 boost::system::error_code error = asio::error::host_not_found;
297 boost::asio::connect(*socket, endpoints, error);
298
299 _isConnected = not error;
300
301 if (not isConnected()) {
302 // Wait a little, since after a couple of ten-thousand trials the system
303 // seems to get confused and the requester connects wrongly to itself.
304 boost::asio::deadline_timer timer(*_ioContext, boost::posix_time::milliseconds(1));
305 timer.wait();
306 }
307 }
308 boost::asio::ip::tcp::no_delay option(true);
309 socket->set_option(option);
310
311 PRECICE_DEBUG("Requested connection to {}, rank = {}", address, acceptorRank);
312 _sockets[acceptorRank] = std::move(socket);
313 send(requesterRank, acceptorRank); // send my rank
314
315 } catch (std::exception &e) {
316 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
317 }
318 }
319 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
320 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
321 _thread = std::thread([this] { _ioContext->run(); });
322}
323
325{
327
328 if (not isConnected())
329 return;
330
331 if (_thread.joinable()) {
332 _workGuard.reset();
333 _ioContext->stop();
334 _thread.join();
335 }
336
337 for (auto &socket : _sockets) {
338 PRECICE_ASSERT(socket.second->is_open());
339
340 try {
341 socket.second->shutdown(Socket::shutdown_send);
342 socket.second->close();
343 } catch (std::exception &e) {
344 PRECICE_WARN("Socket shutdown failed with system error: {}", e.what());
345 }
346 }
347
348 _isConnected = false;
349}
350
351void SocketCommunication::send(std::string const &itemToSend, Rank rankReceiver)
352{
353 PRECICE_TRACE(itemToSend, rankReceiver);
354
355 rankReceiver = adjustRank(rankReceiver);
356
357 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
359
360 size_t size = itemToSend.size() + 1;
361 try {
362 asio::write(*_sockets[rankReceiver], asio::buffer(&size, sizeof(size_t)));
363 asio::write(*_sockets[rankReceiver], asio::buffer(itemToSend.c_str(), size));
364 } catch (std::exception &e) {
365 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
366 }
367}
368
370{
371 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
372
373 rankReceiver = adjustRank(rankReceiver);
374
375 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
377
378 try {
379 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)));
380 } catch (std::exception &e) {
381 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
382 }
383}
384
386 std::string const &requesterName)
387{
388 using namespace std::filesystem;
389 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
390 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
391 try {
393 } catch (const std::filesystem::filesystem_error &e) {
394 PRECICE_WARN("Creating directory for connection info failed with filesystem error: {}", e.what());
395 }
396}
397
399 std::string const &requesterName)
400{
401 using namespace std::filesystem;
402 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
403 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
404 try {
405 remove_all(dir);
406 } catch (const std::filesystem::filesystem_error &e) {
407 PRECICE_WARN("Cleaning up connection info failed with filesystem error {}", e.what());
408 }
409}
410
412{
413 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
414
415 rankReceiver = adjustRank(rankReceiver);
416
417 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
419
420 PtrRequest request(new SocketRequest);
421
422 _queue.dispatch(_sockets[rankReceiver],
423 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)),
424 [request] {
425 std::static_pointer_cast<SocketRequest>(request)->complete();
426 });
427 return request;
428}
429
431{
432 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
433
434 rankReceiver = adjustRank(rankReceiver);
435
436 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
438
439 try {
440 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)));
441 } catch (std::exception &e) {
442 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
443 }
444}
445
447{
448 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
449
450 rankReceiver = adjustRank(rankReceiver);
451
452 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
454
455 PtrRequest request(new SocketRequest);
456
457 _queue.dispatch(_sockets[rankReceiver],
458 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)),
459 [request] {
460 std::static_pointer_cast<SocketRequest>(request)->complete();
461 });
462 return request;
463}
464
465void SocketCommunication::send(double itemToSend, Rank rankReceiver)
466{
467 PRECICE_TRACE(itemToSend, rankReceiver);
468
469 rankReceiver = adjustRank(rankReceiver);
470
471 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
473
474 try {
475 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(double)));
476 } catch (std::exception &e) {
477 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
478 }
479}
480
481PtrRequest SocketCommunication::aSend(const double &itemToSend, Rank rankReceiver)
482{
483 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
484}
485
486void SocketCommunication::send(int itemToSend, Rank rankReceiver)
487{
488 PRECICE_TRACE(itemToSend, rankReceiver);
489
490 rankReceiver = adjustRank(rankReceiver);
491
492 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
494
495 try {
496 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(int)));
497 } catch (std::exception &e) {
498 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
499 }
500}
501
502PtrRequest SocketCommunication::aSend(const int &itemToSend, Rank rankReceiver)
503{
504 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
505}
506
507void SocketCommunication::send(bool itemToSend, Rank rankReceiver)
508{
509 PRECICE_TRACE(itemToSend, rankReceiver);
510
511 rankReceiver = adjustRank(rankReceiver);
512
513 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
515
516 try {
517 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(bool)));
518 } catch (std::exception &e) {
519 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
520 }
521}
522
523PtrRequest SocketCommunication::aSend(const bool &itemToSend, Rank rankReceiver)
524{
525 PRECICE_TRACE(rankReceiver);
526
527 rankReceiver = adjustRank(rankReceiver);
528
529 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
531
532 PtrRequest request(new SocketRequest);
533
534 _queue.dispatch(_sockets[rankReceiver],
535 asio::buffer(&itemToSend, sizeof(bool)),
536 [request] {
537 std::static_pointer_cast<SocketRequest>(request)->complete();
538 });
539 return request;
540}
541
542void SocketCommunication::receive(std::string &itemToReceive, Rank rankSender)
543{
544 PRECICE_TRACE(rankSender);
545
546 rankSender = adjustRank(rankSender);
547
548 PRECICE_ASSERT(rankSender >= 0, rankSender);
550
551 size_t size = 0;
552
553 try {
554 asio::read(*_sockets[rankSender], asio::buffer(&size, sizeof(size_t)));
556 asio::read(*_sockets[rankSender], asio::buffer(msg.data(), size));
557 itemToReceive = msg.data();
558 } catch (std::exception &e) {
559 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
560 }
561}
562
564{
565 PRECICE_TRACE(itemsToReceive.size(), rankSender);
566
567 rankSender = adjustRank(rankSender);
568
569 PRECICE_ASSERT(rankSender >= 0, rankSender);
571
572 try {
573 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(int)));
574 } catch (std::exception &e) {
575 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
576 }
577}
578
580{
581 PRECICE_TRACE(itemsToReceive.size(), rankSender);
582
583 rankSender = adjustRank(rankSender);
584
585 PRECICE_ASSERT(rankSender >= 0, rankSender);
587
588 try {
589 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)));
590 } catch (std::exception &e) {
591 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
592 }
593}
594
596 int rankSender)
597{
598 PRECICE_TRACE(itemsToReceive.size(), rankSender);
599
600 rankSender = adjustRank(rankSender);
601
602 PRECICE_ASSERT(rankSender >= 0, rankSender);
604
605 PtrRequest request(new SocketRequest);
606
607 try {
608 asio::async_read(*_sockets[rankSender],
609 asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)),
610 [request](boost::system::error_code const &, std::size_t) {
611 std::static_pointer_cast<SocketRequest>(request)->complete();
612 });
613 } catch (std::exception &e) {
614 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
615 }
616
617 return request;
618}
619
620void SocketCommunication::receive(double &itemToReceive, Rank rankSender)
621{
622 PRECICE_TRACE(rankSender);
623
624 rankSender = adjustRank(rankSender);
625
626 PRECICE_ASSERT(rankSender >= 0, rankSender);
628
629 try {
630 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(double)));
631 } catch (std::exception &e) {
632 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
633 }
634}
635
636PtrRequest SocketCommunication::aReceive(double &itemToReceive, Rank rankSender)
637{
638 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
639}
640
641void SocketCommunication::receive(int &itemToReceive, Rank rankSender)
642{
643 PRECICE_TRACE(rankSender);
644
645 rankSender = adjustRank(rankSender);
646
647 PRECICE_ASSERT(rankSender >= 0, rankSender);
649
650 try {
651 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(int)));
652 } catch (std::exception &e) {
653 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
654 }
655}
656
657PtrRequest SocketCommunication::aReceive(int &itemToReceive, Rank rankSender)
658{
659 PRECICE_TRACE(rankSender);
660
661 rankSender = adjustRank(rankSender);
662
663 PRECICE_ASSERT((rankSender >= 0) && (rankSender < (int) _sockets.size()),
664 rankSender, _sockets.size());
666
667 PtrRequest request(new SocketRequest);
668
669 try {
670 asio::async_read(*_sockets[rankSender],
671 asio::buffer(&itemToReceive, sizeof(int)),
672 [request](boost::system::error_code const &, std::size_t) {
673 std::static_pointer_cast<SocketRequest>(request)->complete();
674 });
675 } catch (std::exception &e) {
676 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
677 }
678
679 return request;
680}
681
682void SocketCommunication::receive(bool &itemToReceive, Rank rankSender)
683{
684 PRECICE_TRACE(rankSender);
685
686 rankSender = adjustRank(rankSender);
687
688 PRECICE_ASSERT(rankSender >= 0, rankSender);
690
691 try {
692 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(bool)));
693 } catch (std::exception &e) {
694 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
695 }
696}
697
698PtrRequest SocketCommunication::aReceive(bool &itemToReceive, Rank rankSender)
699{
700 PRECICE_TRACE(rankSender);
701
702 rankSender = adjustRank(rankSender);
703
704 PRECICE_ASSERT(rankSender >= 0, rankSender);
706
707 PtrRequest request(new SocketRequest);
708
709 try {
710 asio::async_read(*_sockets[rankSender],
711 asio::buffer(&itemToReceive, sizeof(bool)),
712 [request](boost::system::error_code const &, std::size_t) {
713 std::static_pointer_cast<SocketRequest>(request)->complete();
714 });
715 } catch (std::exception &e) {
716 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
717 }
718
719 return request;
720}
721
722#ifndef _WIN32
723namespace {
724struct Interface {
725 unsigned int index;
726 std::string name;
727 std::string address;
728};
729
730std::vector<Interface> detectInterfaces()
731{
732 std::vector<Interface> interfaces;
733
734 // Collect interface indices and names
735 struct if_nameindex *nameInterface = if_nameindex();
736 for (struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
737 Interface interface;
738 interface.index = itNameInterface->if_index;
739 interface.name = itNameInterface->if_name;
740 interfaces.emplace_back(std::move(interface));
741 }
742 if_freenameindex(nameInterface);
743
744 // Resolve addresses for interfaces
745 for (auto &interface : interfaces) {
746 struct ifreq request;
747 strncpy(request.ifr_name,
748 interface.name.c_str(),
749 IFNAMSIZ - 1); // Copy interface name
750
751 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
752 if (socketfd == -1) {
753 continue;
754 }
755 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
756 close(socketfd);
757 if (err) {
758 continue;
759 }
760
761 const char *addr = inet_ntoa((reinterpret_cast<struct sockaddr_in *>(&request.ifr_addr))->sin_addr);
762 if (!addr) {
763 continue;
764 }
765 interface.address = addr;
766 }
767
768 return interfaces;
769}
770} // namespace
771#endif
772
774{
776
777#ifdef _WIN32
778 return "127.0.0.1";
779#else
780
781 PRECICE_DEBUG("Looking for IP address of network \"{}\"", _networkName);
782
783 auto interfaces = detectInterfaces();
784
785 auto pos = std::find_if(interfaces.begin(), interfaces.end(),
786 [&](Interface const &interface) { return interface.name == _networkName; });
787 if (pos == interfaces.end()) {
788 PRECICE_DEBUG("There's NOTHING");
790 err << "Cannot find network interface \"" << _networkName << "\". Available interfaces are: ";
791 for (const auto &interface : interfaces) {
792 err << interface.name << ' ';
793 }
794 err << " Please check \"network\" attributes in your configuration file.";
795 PRECICE_ERROR(err.str());
796 }
797
798 // Unconnected interfaces don't have an IP address.
799 PRECICE_CHECK(not pos->address.empty(), "The interface \"{}\" does not have an IP address. Please select another interface.", _networkName);
800
801 PRECICE_DEBUG("Detected network IP address of interface \"{}\": {}.", _networkName, pos->address);
802 return pos->address;
803#endif
804}
805
806} // namespace precice::com
#define PRECICE_ERROR(...)
Definition LogMacros.hpp:16
#define PRECICE_WARN(...)
Definition LogMacros.hpp:12
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:61
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:92
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:32
#define PRECICE_ASSERT(...)
Definition assertion.hpp:85
T c_str(T... args)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
std::unique_ptr< WorkGuard > _workGuard
PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
std::shared_ptr< IOContext > _ioContext
void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
void closeConnection() override
Disconnects from communication space, i.e. participant.
void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
A C++ 11 implementation of the non-owning C++20 std::span type.
Definition span.hpp:284
constexpr pointer data() const noexcept
Definition span.hpp:500
constexpr size_type size() const noexcept
Definition span.hpp:469
T create_directories(T... args)
T data(T... args)
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T generic_string(T... args)
T make_shared(T... args)
T make_unique(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
std::shared_ptr< Request > PtrRequest
contains precice-related utilities.
int Rank
Definition Types.hpp:37
auto refToSpan(T &element)
Wraps a single element into a span.
Definition span_tools.hpp:9
STL namespace.
T static_pointer_cast(T... args)
T remove_all(T... args)
T size(T... args)
T stoul(T... args)
T str(T... args)
T strncpy(T... args)
T substr(T... args)
T to_string(T... args)
T what(T... args)