preCICE v3.1.1
Loading...
Searching...
No Matches
SocketCommunication.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <boost/asio.hpp>
3
4#include <filesystem>
5#include <sstream>
6#include <stdexcept>
7#include <utility>
8
11#include "SocketRequest.hpp"
12#include "logging/LogMacros.hpp"
14#include "utils/assertion.hpp"
15#include "utils/networking.hpp"
16#include "utils/span_tools.hpp"
17
18namespace precice::com {
19
20namespace asio = boost::asio;
21
23 bool reuseAddress,
24 std::string networkName,
25 std::string addressDirectory)
26 : _portNumber(portNumber),
27 _reuseAddress(reuseAddress),
28 _networkName(std::move(networkName)),
29 _addressDirectory(std::move(addressDirectory)),
30 _ioService(new IOService)
31{
34 }
35}
36
38 : SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
39{
40}
41
47
54
56 std::string const &requesterName,
57 std::string const &tag,
58 int acceptorRank,
59 int rankOffset)
60{
61 PRECICE_TRACE(acceptorName, requesterName);
62
64
65 setRankOffset(rankOffset);
66
68
69 try {
70 std::string ipAddress = getIpAddress();
71 PRECICE_CHECK(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
72
73 using asio::ip::tcp;
74
75 tcp::acceptor acceptor(*_ioService);
76 tcp::endpoint endpoint(tcp::v4(), _portNumber);
77
78 acceptor.open(endpoint.protocol());
79 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
80 acceptor.bind(endpoint);
81 acceptor.listen();
82
83 _portNumber = acceptor.local_endpoint().port();
84 address = ipAddress + ":" + std::to_string(_portNumber);
85 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
86 conInfo.write(address);
87 PRECICE_DEBUG("Accept connection at {}", address);
88
89 int peerCurrent = 0; // Current peer to connect to
90 int peerCount = -1; // The total count of peers (initialized in the first iteration)
91 int requesterCommunicatorSize = -1;
92
93 do {
94 auto socket = std::make_shared<Socket>(*_ioService);
95
96 acceptor.accept(*socket);
97 PRECICE_DEBUG("Accepted connection at {}", address);
98 _isConnected = true;
99
100 int requesterRank = -1;
101
102 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
103
104 PRECICE_ASSERT(_sockets.count(requesterRank) == 0,
105 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
106
107 _sockets[requesterRank] = std::move(socket);
108 // send and receive expect a rank from the acceptor perspective.
109 // Thus we need to apply given rankOffset before passing it to send/receive.
110 // This is essentially the inverse of adjustRank().
111 auto adjustedRequesterRank = requesterRank + rankOffset;
112 send(acceptorRank, adjustedRequesterRank);
113 receive(requesterCommunicatorSize, adjustedRequesterRank);
114
115 // Initialize the count of peers to connect to
116 if (peerCurrent == 0) {
117 peerCount = requesterCommunicatorSize;
118 }
119
120 PRECICE_ASSERT(requesterCommunicatorSize > 0,
121 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
122 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
123 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
124 } while (++peerCurrent < requesterCommunicatorSize);
125
126 acceptor.close();
127 } catch (std::exception &e) {
128 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
129 }
130
131 // NOTE:
132 // Keep IO service running so that it fires asynchronous handlers from another thread.
133 _work = std::make_shared<asio::io_service::work>(*_ioService);
134 _thread = std::thread([this] { _ioService->run(); });
135}
136
138 std::string const &requesterName,
139 std::string const &tag,
140 int acceptorRank,
141 int requesterCommunicatorSize)
142{
143 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
144 PRECICE_ASSERT(requesterCommunicatorSize >= 0, "Requester communicator size has to be positive.");
146
147 if (requesterCommunicatorSize == 0) {
148 PRECICE_DEBUG("Accepting no connections.");
149 _isConnected = true;
150 return;
151 }
152
154
155 try {
156 std::string ipAddress = getIpAddress();
157 PRECICE_ASSERT(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
158
159 using asio::ip::tcp;
160
161 tcp::acceptor acceptor(*_ioService);
162 {
163 tcp::endpoint endpoint(tcp::v4(), _portNumber);
164
165 acceptor.open(endpoint.protocol());
166 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
167 acceptor.bind(endpoint);
168 acceptor.listen();
169
170 _portNumber = acceptor.local_endpoint().port();
171 }
172
173 address = ipAddress + ":" + std::to_string(_portNumber);
174 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
175 conInfo.write(address);
176
177 PRECICE_DEBUG("Accepting connection at {}", address);
178
179 for (int connection = 0; connection < requesterCommunicatorSize; ++connection) {
180 auto socket = std::make_shared<Socket>(*_ioService);
181 acceptor.accept(*socket);
182 PRECICE_DEBUG("Accepted connection at {}", address);
183 _isConnected = true;
184
185 int requesterRank;
186 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
187 _sockets[requesterRank] = std::move(socket);
188 }
189
190 acceptor.close();
191 } catch (std::exception &e) {
192 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
193 }
194
195 // NOTE: Keep IO service running so that it fires asynchronous handlers from another thread.
196 _work = std::make_shared<asio::io_service::work>(*_ioService);
197 _thread = std::thread([this] { _ioService->run(); });
198}
199
201 std::string const &requesterName,
202 std::string const &tag,
203 int requesterRank,
204 int requesterCommunicatorSize)
205{
206 PRECICE_TRACE(acceptorName, requesterName);
208
209 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
210 std::string const address = conInfo.read();
211 PRECICE_DEBUG("Request connection to {}", address);
212 auto const sepidx = address.find(':');
213 std::string const ipAddress = address.substr(0, sepidx);
214 std::string const portNumber = address.substr(sepidx + 1);
215 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
216
217 try {
218 auto socket = std::make_shared<Socket>(*_ioService);
219
220 using asio::ip::tcp;
221
222 tcp::resolver::query query(tcp::v4(), ipAddress, portNumber, tcp::resolver::query::numeric_host);
223
224 while (not isConnected()) {
225 tcp::resolver resolver(*_ioService);
226 tcp::resolver::endpoint_type endpoint = *(resolver.resolve(query));
227 boost::system::error_code error = asio::error::host_not_found;
228 socket->connect(endpoint, error);
229
230 _isConnected = not error;
231
232 if (not isConnected()) {
233 // Wait a little, since after a couple of ten-thousand trials the system
234 // seems to get confused and the requester connects wrongly to itself.
235 boost::asio::deadline_timer timer(*_ioService, boost::posix_time::milliseconds(1));
236 timer.wait();
237 }
238 }
239
240 PRECICE_DEBUG("Requested connection to {}", address);
241
242 asio::write(*socket, asio::buffer(&requesterRank, sizeof(int)));
243
244 int acceptorRank = -1;
245 asio::read(*socket, asio::buffer(&acceptorRank, sizeof(int)));
246 _sockets[0] = std::move(socket); // should be acceptorRank instead of 0, likewise all communication below
247
248 send(requesterCommunicatorSize, 0);
249
250 } catch (std::exception &e) {
251 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
252 }
253
254 // NOTE: Keep IO service running so that it fires asynchronous handlers from another thread.
255 _work = std::make_shared<asio::io_service::work>(*_ioService);
256 _thread = std::thread([this] { _ioService->run(); });
257}
258
260 std::string const & requesterName,
261 std::string const & tag,
262 std::set<int> const &acceptorRanks,
263 int requesterRank)
264
265{
266 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
268
269 for (auto const &acceptorRank : acceptorRanks) {
270 _isConnected = false;
271 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
272 std::string const address = conInfo.read();
273 auto const sepidx = address.find(':');
274 std::string const ipAddress = address.substr(0, sepidx);
275 std::string const portNumber = address.substr(sepidx + 1);
276 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
277
278 try {
279 auto socket = std::make_shared<Socket>(*_ioService);
280
281 using asio::ip::tcp;
282
283 PRECICE_DEBUG("Requesting connection to {}, port {}", ipAddress, portNumber);
284
285 tcp::resolver::query query(tcp::v4(), ipAddress, portNumber, tcp::resolver::query::numeric_host);
286
287 while (not isConnected()) {
288 tcp::resolver resolver(*_ioService);
289 tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
290 boost::system::error_code error = asio::error::host_not_found;
291 boost::asio::connect(*socket, std::move(endpoint_iterator), error);
292
293 _isConnected = not error;
294
295 if (not isConnected()) {
296 // Wait a little, since after a couple of ten-thousand trials the system
297 // seems to get confused and the requester connects wrongly to itself.
298 boost::asio::deadline_timer timer(*_ioService, boost::posix_time::milliseconds(1));
299 timer.wait();
300 }
301 }
302
303 PRECICE_DEBUG("Requested connection to {}, rank = {}", address, acceptorRank);
304 _sockets[acceptorRank] = std::move(socket);
305 send(requesterRank, acceptorRank); // send my rank
306
307 } catch (std::exception &e) {
308 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
309 }
310 }
311 // NOTE: Keep IO service running so that it fires asynchronous handlers from another thread.
312 _work = std::make_shared<asio::io_service::work>(*_ioService);
313 _thread = std::thread([this] { _ioService->run(); });
314}
315
317{
319
320 if (not isConnected())
321 return;
322
323 if (_thread.joinable()) {
324 _work.reset();
325 _ioService->stop();
326 _thread.join();
327 }
328
329 for (auto &socket : _sockets) {
330 PRECICE_ASSERT(socket.second->is_open());
331
332 try {
333 socket.second->shutdown(Socket::shutdown_send);
334 socket.second->close();
335 } catch (std::exception &e) {
336 PRECICE_WARN("Socket shutdown failed with system error: {}", e.what());
337 }
338 }
339
340 _isConnected = false;
341}
342
343void SocketCommunication::send(std::string const &itemToSend, Rank rankReceiver)
344{
345 PRECICE_TRACE(itemToSend, rankReceiver);
346
347 rankReceiver = adjustRank(rankReceiver);
348
349 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
351
352 size_t size = itemToSend.size() + 1;
353 try {
354 asio::write(*_sockets[rankReceiver], asio::buffer(&size, sizeof(size_t)));
355 asio::write(*_sockets[rankReceiver], asio::buffer(itemToSend.c_str(), size));
356 } catch (std::exception &e) {
357 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
358 }
359}
360
362{
363 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
364
365 rankReceiver = adjustRank(rankReceiver);
366
367 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
369
370 try {
371 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)));
372 } catch (std::exception &e) {
373 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
374 }
375}
376
378 std::string const &requesterName)
379{
380 using namespace std::filesystem;
381 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
382 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
383 try {
384 create_directories(dir);
385 } catch (const std::filesystem::filesystem_error &e) {
386 PRECICE_WARN("Creating directory for connection info failed with filesystem error: {}", e.what());
387 }
388}
389
391 std::string const &requesterName)
392{
393 using namespace std::filesystem;
394 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
395 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
396 try {
397 remove_all(dir);
398 } catch (const std::filesystem::filesystem_error &e) {
399 PRECICE_WARN("Cleaning up connection info failed with filesystem error {}", e.what());
400 }
401}
402
404{
405 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
406
407 rankReceiver = adjustRank(rankReceiver);
408
409 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
411
412 PtrRequest request(new SocketRequest);
413
414 _queue.dispatch(_sockets[rankReceiver],
415 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)),
416 [request] {
417 std::static_pointer_cast<SocketRequest>(request)->complete();
418 });
419 return request;
420}
421
423{
424 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
425
426 rankReceiver = adjustRank(rankReceiver);
427
428 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
430
431 try {
432 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)));
433 } catch (std::exception &e) {
434 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
435 }
436}
437
439{
440 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
441
442 rankReceiver = adjustRank(rankReceiver);
443
444 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
446
447 PtrRequest request(new SocketRequest);
448
449 _queue.dispatch(_sockets[rankReceiver],
450 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)),
451 [request] {
452 std::static_pointer_cast<SocketRequest>(request)->complete();
453 });
454 return request;
455}
456
457void SocketCommunication::send(double itemToSend, Rank rankReceiver)
458{
459 PRECICE_TRACE(itemToSend, rankReceiver);
460
461 rankReceiver = adjustRank(rankReceiver);
462
463 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
465
466 try {
467 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(double)));
468 } catch (std::exception &e) {
469 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
470 }
471}
472
473PtrRequest SocketCommunication::aSend(const double &itemToSend, Rank rankReceiver)
474{
475 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
476}
477
478void SocketCommunication::send(int itemToSend, Rank rankReceiver)
479{
480 PRECICE_TRACE(itemToSend, rankReceiver);
481
482 rankReceiver = adjustRank(rankReceiver);
483
484 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
486
487 try {
488 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(int)));
489 } catch (std::exception &e) {
490 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
491 }
492}
493
494PtrRequest SocketCommunication::aSend(const int &itemToSend, Rank rankReceiver)
495{
496 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
497}
498
499void SocketCommunication::send(bool itemToSend, Rank rankReceiver)
500{
501 PRECICE_TRACE(itemToSend, rankReceiver);
502
503 rankReceiver = adjustRank(rankReceiver);
504
505 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
507
508 try {
509 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(bool)));
510 } catch (std::exception &e) {
511 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
512 }
513}
514
515PtrRequest SocketCommunication::aSend(const bool &itemToSend, Rank rankReceiver)
516{
517 PRECICE_TRACE(rankReceiver);
518
519 rankReceiver = adjustRank(rankReceiver);
520
521 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
523
524 PtrRequest request(new SocketRequest);
525
526 _queue.dispatch(_sockets[rankReceiver],
527 asio::buffer(&itemToSend, sizeof(bool)),
528 [request] {
529 std::static_pointer_cast<SocketRequest>(request)->complete();
530 });
531 return request;
532}
533
534void SocketCommunication::receive(std::string &itemToReceive, Rank rankSender)
535{
536 PRECICE_TRACE(rankSender);
537
538 rankSender = adjustRank(rankSender);
539
540 PRECICE_ASSERT(rankSender >= 0, rankSender);
542
543 size_t size = 0;
544
545 try {
546 asio::read(*_sockets[rankSender], asio::buffer(&size, sizeof(size_t)));
547 std::vector<char> msg(size);
548 asio::read(*_sockets[rankSender], asio::buffer(msg.data(), size));
549 itemToReceive = msg.data();
550 } catch (std::exception &e) {
551 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
552 }
553}
554
556{
557 PRECICE_TRACE(itemsToReceive.size(), rankSender);
558
559 rankSender = adjustRank(rankSender);
560
561 PRECICE_ASSERT(rankSender >= 0, rankSender);
563
564 try {
565 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(int)));
566 } catch (std::exception &e) {
567 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
568 }
569}
570
572{
573 PRECICE_TRACE(itemsToReceive.size(), rankSender);
574
575 rankSender = adjustRank(rankSender);
576
577 PRECICE_ASSERT(rankSender >= 0, rankSender);
579
580 try {
581 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)));
582 } catch (std::exception &e) {
583 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
584 }
585}
586
588 int rankSender)
589{
590 PRECICE_TRACE(itemsToReceive.size(), rankSender);
591
592 rankSender = adjustRank(rankSender);
593
594 PRECICE_ASSERT(rankSender >= 0, rankSender);
596
597 PtrRequest request(new SocketRequest);
598
599 try {
600 asio::async_read(*_sockets[rankSender],
601 asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)),
602 [request](boost::system::error_code const &, std::size_t) {
603 std::static_pointer_cast<SocketRequest>(request)->complete();
604 });
605 } catch (std::exception &e) {
606 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
607 }
608
609 return request;
610}
611
612void SocketCommunication::receive(double &itemToReceive, Rank rankSender)
613{
614 PRECICE_TRACE(rankSender);
615
616 rankSender = adjustRank(rankSender);
617
618 PRECICE_ASSERT(rankSender >= 0, rankSender);
620
621 try {
622 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(double)));
623 } catch (std::exception &e) {
624 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
625 }
626}
627
628PtrRequest SocketCommunication::aReceive(double &itemToReceive, Rank rankSender)
629{
630 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
631}
632
633void SocketCommunication::receive(int &itemToReceive, Rank rankSender)
634{
635 PRECICE_TRACE(rankSender);
636
637 rankSender = adjustRank(rankSender);
638
639 PRECICE_ASSERT(rankSender >= 0, rankSender);
641
642 try {
643 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(int)));
644 } catch (std::exception &e) {
645 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
646 }
647}
648
649PtrRequest SocketCommunication::aReceive(int &itemToReceive, Rank rankSender)
650{
651 PRECICE_TRACE(rankSender);
652
653 rankSender = adjustRank(rankSender);
654
655 PRECICE_ASSERT((rankSender >= 0) && (rankSender < (int) _sockets.size()),
656 rankSender, _sockets.size());
658
659 PtrRequest request(new SocketRequest);
660
661 try {
662 asio::async_read(*_sockets[rankSender],
663 asio::buffer(&itemToReceive, sizeof(int)),
664 [request](boost::system::error_code const &, std::size_t) {
665 std::static_pointer_cast<SocketRequest>(request)->complete();
666 });
667 } catch (std::exception &e) {
668 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
669 }
670
671 return request;
672}
673
674void SocketCommunication::receive(bool &itemToReceive, Rank rankSender)
675{
676 PRECICE_TRACE(rankSender);
677
678 rankSender = adjustRank(rankSender);
679
680 PRECICE_ASSERT(rankSender >= 0, rankSender);
682
683 try {
684 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(bool)));
685 } catch (std::exception &e) {
686 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
687 }
688}
689
690PtrRequest SocketCommunication::aReceive(bool &itemToReceive, Rank rankSender)
691{
692 PRECICE_TRACE(rankSender);
693
694 rankSender = adjustRank(rankSender);
695
696 PRECICE_ASSERT(rankSender >= 0, rankSender);
698
699 PtrRequest request(new SocketRequest);
700
701 try {
702 asio::async_read(*_sockets[rankSender],
703 asio::buffer(&itemToReceive, sizeof(bool)),
704 [request](boost::system::error_code const &, std::size_t) {
705 std::static_pointer_cast<SocketRequest>(request)->complete();
706 });
707 } catch (std::exception &e) {
708 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
709 }
710
711 return request;
712}
713
714#ifndef _WIN32
715namespace {
716struct Interface {
717 unsigned int index;
720};
721
722std::vector<Interface> detectInterfaces()
723{
724 std::vector<Interface> interfaces;
725
726 // Collect interface indices and names
727 struct if_nameindex *nameInterface = if_nameindex();
728 for (struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
729 Interface interface;
730 interface.index = itNameInterface->if_index;
731 interface.name = itNameInterface->if_name;
732 interfaces.emplace_back(std::move(interface));
733 }
734 if_freenameindex(nameInterface);
735
736 // Resolve addresses for interfaces
737 for (auto &interface : interfaces) {
738 struct ifreq request;
739 strncpy(request.ifr_name,
740 interface.name.c_str(),
741 IFNAMSIZ - 1); // Copy interface name
742
743 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
744 if (socketfd == -1) {
745 continue;
746 }
747 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
748 close(socketfd);
749 if (err) {
750 continue;
751 }
752
753 const char *addr = inet_ntoa((reinterpret_cast<struct sockaddr_in *>(&request.ifr_addr))->sin_addr);
754 if (!addr) {
755 continue;
756 }
757 interface.address = addr;
758 }
759
760 return interfaces;
761}
762} // namespace
763#endif
764
766{
768
769#ifdef _WIN32
770 return "127.0.0.1";
771#else
772
773 PRECICE_DEBUG("Looking for IP address of network \"{}\"", _networkName);
774
775 auto interfaces = detectInterfaces();
776
777 auto pos = std::find_if(interfaces.begin(), interfaces.end(),
778 [&](Interface const &interface) { return interface.name == _networkName; });
779 if (pos == interfaces.end()) {
780 PRECICE_DEBUG("There's NOTHING");
782 err << "Cannot find network interface \"" << _networkName << "\". Available interfaces are: ";
783 for (const auto &interface : interfaces) {
784 err << interface.name << ' ';
785 }
786 err << " Please check \"network\" attributes in your configuration file.";
787 PRECICE_ERROR(err.str());
788 }
789
790 // Unconnected interfaces don't have an IP address.
791 PRECICE_CHECK(not pos->address.empty(), "The interface \"{}\" does not have an IP address. Please select another interface.", _networkName);
792
793 PRECICE_DEBUG("Detected network IP address of interface \"{}\": {}.", _networkName, pos->address);
794 return pos->address;
795#endif
796}
797
798} // namespace precice::com
#define PRECICE_ERROR(...)
Definition LogMacros.hpp:15
#define PRECICE_WARN(...)
Definition LogMacros.hpp:11
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:64
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:95
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:35
unsigned int index
std::string address
std::string name
#define PRECICE_ASSERT(...)
Definition assertion.hpp:87
T begin(T... args)
T c_str(T... args)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
Implements Communication by using sockets.
virtual PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
virtual void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
virtual void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
virtual void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
virtual void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
virtual size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
virtual void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
virtual void closeConnection() override
Disconnects from communication space, i.e. participant.
virtual void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
virtual PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
virtual void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
virtual void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
std::shared_ptr< IOService > _ioService
void dispatch(std::shared_ptr< Socket > sock, boost::asio::const_buffers_1 data, std::function< void()> callback)
Put data in the queue, start processing the queue.
A C++ 11 implementation of the non-owning C++20 std::span type.
Definition span.hpp:284
constexpr pointer data() const noexcept
Definition span.hpp:500
constexpr size_type size() const noexcept
Definition span.hpp:469
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find(T... args)
T generic_string(T... args)
T join(T... args)
T joinable(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
int Rank
Definition Types.hpp:37
STL namespace.
T reset(T... args)
T size(T... args)
T stoul(T... args)
T str(T... args)
T strncpy(T... args)
T substr(T... args)
T to_string(T... args)
T what(T... args)